1
0

Compare commits

..

44 Commits

Author SHA1 Message Date
Richard van der Hoff
f56db48f67 Merge branch 'develop' into matrix-org-hotfixes 2018-04-13 11:27:29 +01:00
Erik Johnston
dd7f1b10ce Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-04-12 13:35:56 +01:00
Neil Johnson
661a4d4603 Merge branch 'develop' of https://github.com/matrix-org/synapse into matrix-org-hotfixes 2018-04-10 15:10:57 +01:00
Erik Johnston
a401750ef4 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-04-10 13:32:49 +01:00
Erik Johnston
72c673407f Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-04-09 16:11:43 +01:00
Richard van der Hoff
488a2696b6 Merge branch 'develop' into matrix-org-hotfixes 2018-04-06 09:29:15 +01:00
Richard van der Hoff
052df50e49 Merge branch 'develop' into matrix-org-hotfixes 2018-04-06 09:14:40 +01:00
Richard van der Hoff
3633ee8c63 Fix earlier logging patch
`@cached` doesn't work on decorated functions, because it uses inspection on
the target to calculate the number of arguments.
2018-04-04 23:20:30 +01:00
Richard van der Hoff
912c456d60 Logging for get_users_in_room 2018-04-04 17:30:02 +01:00
Richard van der Hoff
d05575a574 Increase member limiter to 20
Let's see if this makes the bridges go faster, or if it kills the synapse
master.
2018-04-04 16:14:02 +01:00
Richard van der Hoff
2fb6d46042 fix PEP8 2018-04-04 16:13:40 +01:00
Erik Johnston
c7fa6672d9 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-03-28 10:02:11 +01:00
Neil Johnson
36c43eef17 Merge branch 'develop' of https://github.com/matrix-org/synapse into matrix-org-hotfixes 2018-03-26 11:29:10 +01:00
Richard van der Hoff
411c437fc6 more simplejson 2018-03-15 23:54:03 +00:00
Erik Johnston
6783bc0ded Replace some ujson with simplejson to make it work 2018-03-15 23:40:23 +00:00
Erik Johnston
295973e3a9 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-03-14 14:43:28 +00:00
Richard van der Hoff
3b78452c3a Merge branch 'rav/request_logging' into matrix-org-hotfixes 2018-03-09 18:06:53 +00:00
Erik Johnston
1f732e05d0 Merge branch 'erikj/read_marker_caches' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-03-01 17:56:40 +00:00
Erik Johnston
9ed48cecbc Merge branch 'erikj/read_marker_caches' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-03-01 17:11:01 +00:00
Erik Johnston
cc9d4a4d3e Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-03-01 12:31:36 +00:00
hera
889803a78e increase sync cache to 2 minutes
to give synchrotrons being hammered by repeating initial /syncs to get more chance to actually complete and avoid a DoS
2018-02-28 00:35:03 +00:00
Erik Johnston
22245fb8a7 Actuall set cache factors in workers 2018-02-21 21:10:48 +00:00
Erik Johnston
78c5eca141 Add hacky cache factor override system 2018-02-21 21:00:24 +00:00
Richard van der Hoff
55bef59cc7 (Really) fix tablescan of event_push_actions on purge
commit 278d21b5 added new code to avoid the tablescan, but didn't remove the
old :/
2018-02-20 10:37:44 +00:00
Erik Johnston
ad683cd203 Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-02-15 14:10:11 +00:00
Erik Johnston
d66afef01e Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes 2018-02-15 13:03:05 +00:00
Richard van der Hoff
b07a33f024 Avoid doing presence updates on replication reconnect
Presence is supposed to be disabled on matrix.org, so we shouldn't send a load
of USER_SYNC commands every time the synchrotron reconnects to the master.
2018-02-15 09:51:09 +00:00
hera
74539aa25b Disable auth on room_members for now
because the moznet bridge is broken (https://github.com/matrix-org/matrix-appservice-irc/issues/506)
2018-02-15 09:51:09 +00:00
Erik Johnston
8f25cd6627 Bump LAST_SEEN_GRANULARITY in client_ips 2018-02-15 09:51:09 +00:00
Erik Johnston
dc1299a4b0 Prefill client_ip_last_seen in replication 2018-02-15 09:51:09 +00:00
Erik Johnston
2c72d66cda Move event sending to end in shutdown room admin api 2018-02-15 09:51:09 +00:00
Erik Johnston
cde90a89ed Add dummy presence REST handler to frontend proxy
The handler no-ops all requests as presence is disabled.
2018-02-15 09:51:09 +00:00
Erik Johnston
5aec53ad95 Don't intern type/state_keys in state store 2018-02-15 09:51:09 +00:00
Erik Johnston
d10d19f0ad Increase store._state_group_cache cache size 2018-02-15 09:51:09 +00:00
Erik Johnston
daec1d77be Make _get_joined_hosts_cache cache non-iterable 2018-02-15 09:51:09 +00:00
Erik Johnston
61885f7849 Increase MAX_EVENTS_BEHIND for replication clients 2018-02-15 09:51:09 +00:00
Erik Johnston
ba30d489d9 Disable presence in txn queue 2018-02-15 09:51:09 +00:00
Erik Johnston
5e2d0650df Handle exceptions in get_hosts_for_room when sending events over federation 2018-02-15 09:51:09 +00:00
Erik Johnston
841bcbcafa Limit concurrent AS joins 2018-02-15 09:51:09 +00:00
Erik Johnston
08a6b88e3d Disable presence
This reverts commit 0ebd376a53 and
disables presence a bit more
2018-02-15 09:51:09 +00:00
Erik Johnston
aece8e73b1 Make push actions rotation configurable 2018-02-15 09:51:09 +00:00
Mark Haines
328bd35e00 Deleting from event_push_actions needs to use an index 2018-02-15 09:51:09 +00:00
Erik Johnston
7de9a28b8e Disable auto search for prefixes in event search 2018-02-15 09:51:09 +00:00
Erik Johnston
2a9c3aea89 Add timeout to ResponseCache of /public_rooms 2018-02-15 09:51:09 +00:00
122 changed files with 1004 additions and 1835 deletions

View File

@@ -1,22 +1,14 @@
sudo: false
language: python
python: 2.7
# tell travis to cache ~/.cache/pip
cache: pip
matrix:
include:
- python: 2.7
env: TOX_ENV=packaging
- python: 2.7
env: TOX_ENV=pep8
- python: 2.7
env: TOX_ENV=py27
- python: 3.6
env: TOX_ENV=py36
env:
- TOX_ENV=packaging
- TOX_ENV=pep8
- TOX_ENV=py27
install:
- pip install tox

View File

@@ -1,117 +1,15 @@
Changes in synapse <unreleased>
===============================
Potentially breaking change:
* Make Client-Server API return 401 for invalid token (PR #3161).
This changes the Client-server spec to return a 401 error code instead of 403
when the access token is unrecognised. This is the behaviour required by the
specification, but some clients may be relying on the old, incorrect
behaviour.
Thanks to @NotAFile for fixing this.
Changes in synapse v0.28.1 (2018-05-01)
=======================================
SECURITY UPDATE
* Clamp the allowed values of event depth received over federation to be
[0, 2^63 - 1]. This mitigates an attack where malicious events
injected with depth = 2^63 - 1 render rooms unusable. Depth is used to
determine the cosmetic ordering of events within a room, and so the ordering
of events in such a room will default to using stream_ordering rather than depth
(topological_ordering).
This is a temporary solution to mitigate abuse in the wild, whilst a long term solution
is being implemented to improve how the depth parameter is used.
Full details at
https://docs.google.com/document/d/1I3fi2S-XnpO45qrpCsowZv8P8dHcNZ4fsBsbOW7KABI
* Pin Twisted to <18.4 until we stop using the private _OpenSSLECCurve API.
Changes in synapse v0.28.0 (2018-04-26)
=======================================
Bug Fixes:
* Fix quarantine media admin API and search reindex (PR #3130)
* Fix media admin APIs (PR #3134)
Changes in synapse v0.28.0-rc1 (2018-04-24)
===========================================
Minor performance improvement to federation sending and bug fixes.
(Note: This release does not include the delta state resolution implementation discussed in matrix live)
Features:
* Add metrics for event processing lag (PR #3090)
* Add metrics for ResponseCache (PR #3092)
Changes:
* Synapse on PyPy (PR #2760) Thanks to @Valodim!
* move handling of auto_join_rooms to RegisterHandler (PR #2996) Thanks to @krombel!
* Improve handling of SRV records for federation connections (PR #3016) Thanks to @silkeh!
* Document the behaviour of ResponseCache (PR #3059)
* Preparation for py3 (PR #3061, #3073, #3074, #3075, #3103, #3104, #3106, #3107, #3109, #3110) Thanks to @NotAFile!
* update prometheus dashboard to use new metric names (PR #3069) Thanks to @krombel!
* use python3-compatible prints (PR #3074) Thanks to @NotAFile!
* Send federation events concurrently (PR #3078)
* Limit concurrent event sends for a room (PR #3079)
* Improve R30 stat definition (PR #3086)
* Send events to ASes concurrently (PR #3088)
* Refactor ResponseCache usage (PR #3093)
* Clarify that SRV may not point to a CNAME (PR #3100) Thanks to @silkeh!
* Use str(e) instead of e.message (PR #3103) Thanks to @NotAFile!
* Use six.itervalues in some places (PR #3106) Thanks to @NotAFile!
* Refactor store.have_events (PR #3117)
Bug Fixes:
* Return 401 for invalid access_token on logout (PR #2938) Thanks to @dklug!
* Return a 404 rather than a 500 on rejoining empty rooms (PR #3080)
* fix federation_domain_whitelist (PR #3099)
* Avoid creating events with huge numbers of prev_events (PR #3113)
* Reject events which have lots of prev_events (PR #3118)
Changes in synapse v0.27.4 (2018-04-13)
======================================
Changes:
* Update canonicaljson dependency (#3095)
Changes in synapse v0.27.3 (2018-04-11)
======================================
Bug fixes:
* URL quote path segments over federation (#3082)
Changes in synapse v0.27.3-rc2 (2018-04-09)
==========================================
v0.27.3-rc1 used a stale version of the develop branch so the changelog overstates
the functionality. v0.27.3-rc2 is up to date, rc1 should be ignored.
Changes in synapse v0.27.3-rc1 (2018-04-09)
=======================================
Notable changes include API support for joinability of groups. Also new metrics
Notable changes include API support for joinability of groups. Also new metrics
and phone home stats. Phone home stats include better visibility of system usage
so we can tweak synpase to work better for all users rather than our own experience
with matrix.org. Also, recording 'r30' stat which is the measure we use to track
with matrix.org. Also, recording 'r30' stat which is the measure we use to track
overal growth of the Matrix ecosystem. It is defined as:-
Counts the number of native 30 day retained users, defined as:-
@@ -147,6 +45,7 @@ Bug fixes:
* Add room_id to the response of `rooms/{roomId}/join` (PR #2986) Thanks to @jplatte!
* Fix replication after switch to simplejson (PR #3015)
* Fix replication after switch to simplejson (PR #3015)
* 404 correctly on missing paths via NoResource (PR #3022)
* Fix error when claiming e2e keys from offline servers (PR #3034)
* fix tests/storage/test_user_directory.py (PR #3042)

View File

@@ -157,8 +157,8 @@ if you prefer.
In case of problems, please see the _`Troubleshooting` section below.
Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a Dockerfile to automate the
above in Docker at https://hub.docker.com/r/avhost/docker-matrix/tags/
Alternatively, Silvio Fricke has contributed a Dockerfile to automate the
above in Docker at https://registry.hub.docker.com/u/silviof/docker-matrix/.
Also, Martin Giess has created an auto-deployment process with vagrant/ansible,
tested with VirtualBox/AWS/DigitalOcean - see https://github.com/EMnify/matrix-synapse-auto-deploy
@@ -614,9 +614,6 @@ should have the format ``_matrix._tcp.<yourdomain.com> <ttl> IN SRV 10 0 <port>
$ dig -t srv _matrix._tcp.example.com
_matrix._tcp.example.com. 3600 IN SRV 10 0 8448 synapse.example.com.
Note that the server hostname cannot be an alias (CNAME record): it has to point
directly to the server hosting the synapse instance.
You can then configure your homeserver to use ``<yourdomain.com>`` as the domain in
its user-ids, by setting ``server_name``::

View File

@@ -1,10 +0,0 @@
Community Contributions
=======================
Everything in this directory are projects submitted by the community that may be useful
to others. As such, the project maintainers cannot guarantee support, stability
or backwards compatibility of these projects.
Files in this directory should *not* be relied on directly, as they may not
continue to work or exist in future. If you wish to use any of these files then
they should be copied to avoid them breaking from underneath you.

View File

@@ -2,9 +2,6 @@
# (e.g. https://www.archlinux.org/packages/community/any/matrix-synapse/ for ArchLinux)
# rather than in a user home directory or similar under virtualenv.
# **NOTE:** This is an example service file that may change in the future. If you
# wish to use this please copy rather than symlink it.
[Unit]
Description=Synapse Matrix homeserver
@@ -15,7 +12,6 @@ Group=synapse
WorkingDirectory=/var/lib/synapse
ExecStart=/usr/bin/python2.7 -m synapse.app.homeserver --config-path=/etc/synapse/homeserver.yaml
ExecStop=/usr/bin/synctl stop /etc/synapse/homeserver.yaml
# EnvironmentFile=-/etc/sysconfig/synapse # Can be used to e.g. set SYNAPSE_CACHE_FACTOR
[Install]
WantedBy=multi-user.target

View File

@@ -1,7 +1,5 @@
#! /bin/bash
set -eux
cd "`dirname $0`/.."
TOX_DIR=$WORKSPACE/.tox
@@ -16,20 +14,7 @@ fi
tox -e py27 --notest -v
TOX_BIN=$TOX_DIR/py27/bin
# cryptography 2.2 requires setuptools >= 18.5.
#
# older versions of virtualenv (?) give us a virtualenv with the same version
# of setuptools as is installed on the system python (and tox runs virtualenv
# under python3, so we get the version of setuptools that is installed on that).
#
# anyway, make sure that we have a recent enough setuptools.
$TOX_BIN/pip install 'setuptools>=18.5'
# we also need a semi-recent version of pip, because old ones fail to install
# the "enum34" dependency of cryptography.
$TOX_BIN/pip install 'pip>=10'
$TOX_BIN/pip install setuptools
{ python synapse/python_dependencies.py
echo lxml psycopg2
} | xargs $TOX_BIN/pip install

View File

@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.28.1"
__version__ = "0.27.3-rc2"

View File

@@ -16,9 +16,6 @@
"""Contains constants from the specification."""
# the "depth" field on events is limited to 2**63 - 1
MAX_DEPTH = 2**63 - 1
class Membership(object):

View File

@@ -18,7 +18,6 @@
import logging
import simplejson as json
from six import iteritems
logger = logging.getLogger(__name__)
@@ -298,7 +297,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
A dict representing the error response JSON.
"""
err = {"error": msg, "errcode": code}
for key, value in iteritems(kwargs):
for key, value in kwargs.iteritems():
err[key] = value
return err

View File

@@ -32,10 +32,10 @@ from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
from twisted.internet import reactor, defer
from twisted.internet import reactor
from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.appservice")
@@ -112,14 +112,9 @@ class ASReplicationHandler(ReplicationClientHandler):
if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering()
run_in_background(self._notify_app_services, max_stream_id)
@defer.inlineCallbacks
def _notify_app_services(self, room_stream_id):
try:
yield self.appservice_handler.notify_interested_services(room_stream_id)
except Exception:
logger.exception("Error notifying application services of event")
preserve_fn(
self.appservice_handler.notify_interested_services
)(max_stream_id)
def start(config_options):

View File

@@ -38,7 +38,7 @@ from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor
@@ -229,7 +229,7 @@ class FederationSenderHandler(object):
# presence, typing, etc.
if stream_name == "federation":
send_queue.process_rows_for_federation(self.federation_sender, rows)
run_in_background(self.update_token, token)
preserve_fn(self.update_token)(token)
# We also need to poke the federation sender when new events happen
elif stream_name == "events":
@@ -237,22 +237,19 @@ class FederationSenderHandler(object):
@defer.inlineCallbacks
def update_token(self, token):
try:
self.federation_position = token
self.federation_position = token
# We linearize here to ensure we don't have races updating the token
with (yield self._fed_position_linearizer.queue(None)):
if self._last_ack < self.federation_position:
yield self.store.update_federation_out_pos(
"federation", self.federation_position
)
# We linearize here to ensure we don't have races updating the token
with (yield self._fed_position_linearizer.queue(None)):
if self._last_ack < self.federation_position:
yield self.store.update_federation_out_pos(
"federation", self.federation_position
)
# We ACK this token over replication so that the master can drop
# its in memory queues
self.replication_client.send_federation_ack(self.federation_position)
self._last_ack = self.federation_position
except Exception:
logger.exception("Error updating federation stream position")
# We ACK this token over replication so that the master can drop
# its in memory queues
self.replication_client.send_federation_ack(self.federation_position)
self._last_ack = self.federation_position
if __name__ == '__main__':

View File

@@ -36,6 +36,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
from synapse.rest.client.v2_alpha._base import client_v2_patterns
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -49,6 +50,35 @@ from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.frontend_proxy")
class PresenceStatusStubServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
def __init__(self, hs):
super(PresenceStatusStubServlet, self).__init__(hs)
self.http_client = hs.get_simple_http_client()
self.auth = hs.get_auth()
self.main_uri = hs.config.worker_main_http_uri
@defer.inlineCallbacks
def on_GET(self, request, user_id):
# Pass through the auth headers, if any, in case the access token
# is there.
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
headers = {
"Authorization": auth_headers,
}
result = yield self.http_client.get_json(
self.main_uri + request.uri,
headers=headers,
)
defer.returnValue((200, result))
@defer.inlineCallbacks
def on_PUT(self, request, user_id):
yield self.auth.get_user_by_req(request)
defer.returnValue((200, {}))
class KeyUploadServlet(RestServlet):
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
@@ -135,6 +165,7 @@ class FrontendProxyServer(HomeServer):
elif name == "client":
resource = JsonResource(self, canonical_json=False)
KeyUploadServlet(self).register(resource)
PresenceStatusStubServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,

View File

@@ -33,7 +33,7 @@ from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor
@@ -140,27 +140,24 @@ class PusherReplicationHandler(ReplicationClientHandler):
def on_rdata(self, stream_name, token, rows):
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)
preserve_fn(self.poke_pushers)(stream_name, token, rows)
@defer.inlineCallbacks
def poke_pushers(self, stream_name, token, rows):
try:
if stream_name == "pushers":
for row in rows:
if row.deleted:
yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
yield self.pusher_pool.on_new_notifications(
token, token,
)
elif stream_name == "receipts":
yield self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows)
)
except Exception:
logger.exception("Error poking pushers")
if stream_name == "pushers":
for row in rows:
if row.deleted:
yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
yield self.pusher_pool.on_new_notifications(
token, token,
)
elif stream_name == "receipts":
yield self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows)
)
def stop_pusher(self, user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey)

View File

@@ -51,15 +51,13 @@ from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
from synapse.util.stringutils import random_string
from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor
from twisted.web.resource import NoResource
from six import iteritems
logger = logging.getLogger("synapse.app.synchrotron")
@@ -115,6 +113,7 @@ class SynchrotronPresence(object):
logger.info("Presence process_id is %r", self.process_id)
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
return
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
def mark_as_coming_online(self, user_id):
@@ -212,8 +211,10 @@ class SynchrotronPresence(object):
yield self.notify_from_replication(states, stream_id)
def get_currently_syncing_users(self):
# presence is disabled on matrix.org, so we return the empty set
return set()
return [
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
if count > 0
]
@@ -327,7 +328,8 @@ class SyncReplicationHandler(ReplicationClientHandler):
def on_rdata(self, stream_name, token, rows):
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)
preserve_fn(self.process_and_notify)(stream_name, token, rows)
def get_streams_to_replicate(self):
args = super(SyncReplicationHandler, self).get_streams_to_replicate()
@@ -339,58 +341,55 @@ class SyncReplicationHandler(ReplicationClientHandler):
@defer.inlineCallbacks
def process_and_notify(self, stream_name, token, rows):
try:
if stream_name == "events":
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
for row in rows:
event = yield self.store.get_event(row.event_id)
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
max_token = self.store.get_room_max_stream_ordering()
self.notifier.on_new_room_event(
event, token, max_token, extra_users
)
elif stream_name == "push_rules":
self.notifier.on_new_event(
"push_rules_key", token, users=[row.user_id for row in rows],
if stream_name == "events":
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
for row in rows:
event = yield self.store.get_event(row.event_id)
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
max_token = self.store.get_room_max_stream_ordering()
self.notifier.on_new_room_event(
event, token, max_token, extra_users
)
elif stream_name in ("account_data", "tag_account_data",):
elif stream_name == "push_rules":
self.notifier.on_new_event(
"push_rules_key", token, users=[row.user_id for row in rows],
)
elif stream_name in ("account_data", "tag_account_data",):
self.notifier.on_new_event(
"account_data_key", token, users=[row.user_id for row in rows],
)
elif stream_name == "receipts":
self.notifier.on_new_event(
"receipt_key", token, rooms=[row.room_id for row in rows],
)
elif stream_name == "typing":
self.typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event(
"typing_key", token, rooms=[row.room_id for row in rows],
)
elif stream_name == "to_device":
entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities:
self.notifier.on_new_event(
"account_data_key", token, users=[row.user_id for row in rows],
"to_device_key", token, users=entities,
)
elif stream_name == "receipts":
self.notifier.on_new_event(
"receipt_key", token, rooms=[row.room_id for row in rows],
)
elif stream_name == "typing":
self.typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event(
"typing_key", token, rooms=[row.room_id for row in rows],
)
elif stream_name == "to_device":
entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities:
self.notifier.on_new_event(
"to_device_key", token, users=entities,
)
elif stream_name == "device_lists":
all_room_ids = set()
for row in rows:
room_ids = yield self.store.get_rooms_for_user(row.user_id)
all_room_ids.update(room_ids)
self.notifier.on_new_event(
"device_list_key", token, rooms=all_room_ids,
)
elif stream_name == "presence":
yield self.presence_handler.process_replication_rows(token, rows)
elif stream_name == "receipts":
self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows],
)
except Exception:
logger.exception("Error processing replication")
elif stream_name == "device_lists":
all_room_ids = set()
for row in rows:
room_ids = yield self.store.get_rooms_for_user(row.user_id)
all_room_ids.update(room_ids)
self.notifier.on_new_event(
"device_list_key", token, rooms=all_room_ids,
)
elif stream_name == "presence":
yield self.presence_handler.process_replication_rows(token, rows)
elif stream_name == "receipts":
self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows],
)
def start(config_options):

View File

@@ -108,7 +108,7 @@ def stop(pidfile, app):
Worker = collections.namedtuple("Worker", [
"app", "configfile", "pidfile", "cache_factor"
"app", "configfile", "pidfile", "cache_factor", "cache_factors",
])
@@ -171,6 +171,10 @@ def main():
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
cache_factors = config.get("synctl_cache_factors", {})
for cache_name, factor in cache_factors.iteritems():
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
worker_configfiles = []
if options.worker:
start_stop_synapse = False
@@ -211,6 +215,10 @@ def main():
or pidfile
)
worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
worker_cache_factors = (
worker_config.get("synctl_cache_factors")
or cache_factors
)
daemonize = worker_config.get("daemonize") or config.get("daemonize")
assert daemonize, "Main process must have daemonize set to true"
@@ -226,8 +234,10 @@ def main():
assert worker_daemonize, "In config %r: expected '%s' to be True" % (
worker_configfile, "worker_daemonize")
worker_cache_factor = worker_config.get("synctl_cache_factor")
worker_cache_factors = worker_config.get("synctl_cache_factors", {})
workers.append(Worker(
worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
worker_cache_factors,
))
action = options.action
@@ -262,15 +272,19 @@ def main():
start(configfile)
for worker in workers:
env = os.environ.copy()
if worker.cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
for cache_name, factor in worker.cache_factors.iteritems():
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
start_worker(worker.app, configfile, worker.configfile)
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
else:
os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
# Reset env back to the original
os.environ.clear()
os.environ.update(env)
if __name__ == "__main__":

View File

@@ -39,10 +39,10 @@ from synapse.storage.engines import create_engine
from synapse.storage.user_directory import UserDirectoryStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
from twisted.internet import reactor, defer
from twisted.internet import reactor
from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.user_dir")
@@ -164,14 +164,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
stream_name, token, rows
)
if stream_name == "current_state_deltas":
run_in_background(self._notify_directory)
@defer.inlineCallbacks
def _notify_directory(self):
try:
yield self.user_directory.notify_new_event()
except Exception:
logger.exception("Error notifiying user directory of state update")
preserve_fn(self.user_directory.notify_new_event)()
def start(config_options):

View File

@@ -18,6 +18,7 @@ from synapse.api.constants import ThirdPartyEntityKind
from synapse.api.errors import CodeMessageException
from synapse.http.client import SimpleHttpClient
from synapse.events.utils import serialize_event
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.caches.response_cache import ResponseCache
from synapse.types import ThirdPartyInstanceID
@@ -193,7 +194,12 @@ class ApplicationServiceApi(SimpleHttpClient):
defer.returnValue(None)
key = (service.id, protocol)
return self.protocol_meta_cache.wrap(key, _get)
result = self.protocol_meta_cache.get(key)
if not result:
result = self.protocol_meta_cache.set(
key, preserve_fn(_get)()
)
return make_deferred_yieldable(result)
@defer.inlineCallbacks
def push_bulk(self, service, events, txn_id=None):

View File

@@ -51,7 +51,7 @@ components.
from twisted.internet import defer
from synapse.appservice import ApplicationServiceState
from synapse.util.logcontext import run_in_background
from synapse.util.logcontext import preserve_fn
from synapse.util.metrics import Measure
import logging
@@ -106,7 +106,7 @@ class _ServiceQueuer(object):
def enqueue(self, service, event):
# if this service isn't being sent something
self.queued_events.setdefault(service.id, []).append(event)
run_in_background(self._send_request, service)
preserve_fn(self._send_request)(service)
@defer.inlineCallbacks
def _send_request(self, service):
@@ -152,10 +152,10 @@ class _TransactionController(object):
if sent:
yield txn.complete(self.store)
else:
run_in_background(self._start_recoverer, service)
except Exception:
logger.exception("Error creating appservice transaction")
run_in_background(self._start_recoverer, service)
preserve_fn(self._start_recoverer)(service)
except Exception as e:
logger.exception(e)
preserve_fn(self._start_recoverer)(service)
@defer.inlineCallbacks
def on_recovered(self, recoverer):
@@ -176,20 +176,17 @@ class _TransactionController(object):
@defer.inlineCallbacks
def _start_recoverer(self, service):
try:
yield self.store.set_appservice_state(
service,
ApplicationServiceState.DOWN
)
logger.info(
"Application service falling behind. Starting recoverer. AS ID %s",
service.id
)
recoverer = self.recoverer_fn(service, self.on_recovered)
self.add_recoverers([recoverer])
recoverer.recover()
except Exception:
logger.exception("Error starting AS recoverer")
yield self.store.set_appservice_state(
service,
ApplicationServiceState.DOWN
)
logger.info(
"Application service falling behind. Starting recoverer. AS ID %s",
service.id
)
recoverer = self.recoverer_fn(service, self.on_recovered)
self.add_recoverers([recoverer])
recoverer.recover()
@defer.inlineCallbacks
def _is_service_up(self, service):

View File

@@ -281,15 +281,15 @@ class Config(object):
)
if not cls.path_exists(config_dir_path):
os.makedirs(config_dir_path)
with open(config_path, "w") as config_file:
config_str, config = obj.generate_config(
with open(config_path, "wb") as config_file:
config_bytes, config = obj.generate_config(
config_dir_path=config_dir_path,
server_name=server_name,
report_stats=(config_args.report_stats == "yes"),
is_generating_file=True
)
obj.invoke_all("generate_files", config)
config_file.write(config_str)
config_file.write(config_bytes)
print((
"A config file has been generated in %r for server name"
" %r with corresponding SSL keys and self-signed"

View File

@@ -17,11 +17,11 @@ from ._base import Config, ConfigError
from synapse.appservice import ApplicationService
from synapse.types import UserID
import urllib
import yaml
import logging
from six import string_types
from six.moves.urllib import parse as urlparse
logger = logging.getLogger(__name__)
@@ -105,7 +105,7 @@ def _load_appservice(hostname, as_info, config_filename):
)
localpart = as_info["sender_localpart"]
if urlparse.quote(localpart) != localpart:
if urllib.quote(localpart) != localpart:
raise ValueError(
"sender_localpart needs characters which are not URL encoded."
)

View File

@@ -117,7 +117,7 @@ class LoggingConfig(Config):
log_config = config.get("log_config")
if log_config and not os.path.exists(log_config):
log_file = self.abspath("homeserver.log")
with open(log_config, "w") as log_config_file:
with open(log_config, "wb") as log_config_file:
log_config_file.write(
DEFAULT_LOG_CONFIG.substitute(log_file=log_file)
)

View File

@@ -133,7 +133,7 @@ class TlsConfig(Config):
tls_dh_params_path = config["tls_dh_params_path"]
if not self.path_exists(tls_private_key_path):
with open(tls_private_key_path, "wb") as private_key_file:
with open(tls_private_key_path, "w") as private_key_file:
tls_private_key = crypto.PKey()
tls_private_key.generate_key(crypto.TYPE_RSA, 2048)
private_key_pem = crypto.dump_privatekey(
@@ -148,7 +148,7 @@ class TlsConfig(Config):
)
if not self.path_exists(tls_certificate_path):
with open(tls_certificate_path, "wb") as certificate_file:
with open(tls_certificate_path, "w") as certificate_file:
cert = crypto.X509()
subject = cert.get_subject()
subject.CN = config["server_name"]

View File

@@ -13,8 +13,8 @@
# limitations under the License.
from twisted.internet import ssl
from OpenSSL import SSL, crypto
from twisted.internet._sslverify import _defaultCurveName
from OpenSSL import SSL
from twisted.internet._sslverify import _OpenSSLECCurve, _defaultCurveName
import logging
@@ -32,9 +32,8 @@ class ServerContextFactory(ssl.ContextFactory):
@staticmethod
def configure_context(context, config):
try:
_ecCurve = crypto.get_elliptic_curve(_defaultCurveName)
context.set_tmp_ecdh(_ecCurve)
_ecCurve = _OpenSSLECCurve(_defaultCurveName)
_ecCurve.addECKeyToContext(context)
except Exception:
logger.exception("Failed to enable elliptic curve for TLS")
context.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)

View File

@@ -19,8 +19,7 @@ from synapse.api.errors import SynapseError, Codes
from synapse.util import unwrapFirstError, logcontext
from synapse.util.logcontext import (
PreserveLoggingContext,
preserve_fn,
run_in_background,
preserve_fn
)
from synapse.util.metrics import Measure
@@ -128,7 +127,7 @@ class Keyring(object):
verify_requests.append(verify_request)
run_in_background(self._start_key_lookups, verify_requests)
preserve_fn(self._start_key_lookups)(verify_requests)
# Pass those keys to handle_key_deferred so that the json object
# signatures can be verified
@@ -147,56 +146,53 @@ class Keyring(object):
verify_requests (List[VerifyKeyRequest]):
"""
try:
# create a deferred for each server we're going to look up the keys
# for; we'll resolve them once we have completed our lookups.
# These will be passed into wait_for_previous_lookups to block
# any other lookups until we have finished.
# The deferreds are called with no logcontext.
server_to_deferred = {
rq.server_name: defer.Deferred()
for rq in verify_requests
}
# create a deferred for each server we're going to look up the keys
# for; we'll resolve them once we have completed our lookups.
# These will be passed into wait_for_previous_lookups to block
# any other lookups until we have finished.
# The deferreds are called with no logcontext.
server_to_deferred = {
rq.server_name: defer.Deferred()
for rq in verify_requests
}
# We want to wait for any previous lookups to complete before
# proceeding.
yield self.wait_for_previous_lookups(
[rq.server_name for rq in verify_requests],
server_to_deferred,
# We want to wait for any previous lookups to complete before
# proceeding.
yield self.wait_for_previous_lookups(
[rq.server_name for rq in verify_requests],
server_to_deferred,
)
# Actually start fetching keys.
self._get_server_verify_keys(verify_requests)
# When we've finished fetching all the keys for a given server_name,
# resolve the deferred passed to `wait_for_previous_lookups` so that
# any lookups waiting will proceed.
#
# map from server name to a set of request ids
server_to_request_ids = {}
for verify_request in verify_requests:
server_name = verify_request.server_name
request_id = id(verify_request)
server_to_request_ids.setdefault(server_name, set()).add(request_id)
def remove_deferreds(res, verify_request):
server_name = verify_request.server_name
request_id = id(verify_request)
server_to_request_ids[server_name].discard(request_id)
if not server_to_request_ids[server_name]:
d = server_to_deferred.pop(server_name, None)
if d:
d.callback(None)
return res
for verify_request in verify_requests:
verify_request.deferred.addBoth(
remove_deferreds, verify_request,
)
# Actually start fetching keys.
self._get_server_verify_keys(verify_requests)
# When we've finished fetching all the keys for a given server_name,
# resolve the deferred passed to `wait_for_previous_lookups` so that
# any lookups waiting will proceed.
#
# map from server name to a set of request ids
server_to_request_ids = {}
for verify_request in verify_requests:
server_name = verify_request.server_name
request_id = id(verify_request)
server_to_request_ids.setdefault(server_name, set()).add(request_id)
def remove_deferreds(res, verify_request):
server_name = verify_request.server_name
request_id = id(verify_request)
server_to_request_ids[server_name].discard(request_id)
if not server_to_request_ids[server_name]:
d = server_to_deferred.pop(server_name, None)
if d:
d.callback(None)
return res
for verify_request in verify_requests:
verify_request.deferred.addBoth(
remove_deferreds, verify_request,
)
except Exception:
logger.exception("Error starting key lookups")
@defer.inlineCallbacks
def wait_for_previous_lookups(self, server_names, server_to_deferred):
"""Waits for any previous key lookups for the given servers to finish.
@@ -317,7 +313,7 @@ class Keyring(object):
if not verify_request.deferred.called:
verify_request.deferred.errback(err)
run_in_background(do_iterations).addErrback(on_err)
preserve_fn(do_iterations)().addErrback(on_err)
@defer.inlineCallbacks
def get_keys_from_store(self, server_name_and_key_ids):
@@ -333,9 +329,8 @@ class Keyring(object):
"""
res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
run_in_background(
self.store.get_server_verify_keys,
server_name, key_ids,
preserve_fn(self.store.get_server_verify_keys)(
server_name, key_ids
).addCallback(lambda ks, server: (server, ks), server_name)
for server_name, key_ids in server_name_and_key_ids
],
@@ -357,13 +352,13 @@ class Keyring(object):
logger.exception(
"Unable to get key from %r: %s %s",
perspective_name,
type(e).__name__, str(e),
type(e).__name__, str(e.message),
)
defer.returnValue({})
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
run_in_background(get_key, p_name, p_keys)
preserve_fn(get_key)(p_name, p_keys)
for p_name, p_keys in self.perspective_servers.items()
],
consumeErrors=True,
@@ -389,7 +384,7 @@ class Keyring(object):
logger.info(
"Unable to get key %r for %r directly: %s %s",
key_ids, server_name,
type(e).__name__, str(e),
type(e).__name__, str(e.message),
)
if not keys:
@@ -403,7 +398,7 @@ class Keyring(object):
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
run_in_background(get_key, server_name, key_ids)
preserve_fn(get_key)(server_name, key_ids)
for server_name, key_ids in server_name_and_key_ids
],
consumeErrors=True,
@@ -486,8 +481,7 @@ class Keyring(object):
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
run_in_background(
self.store_keys,
preserve_fn(self.store_keys)(
server_name=server_name,
from_server=perspective_name,
verify_keys=response_keys,
@@ -545,8 +539,7 @@ class Keyring(object):
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
run_in_background(
self.store_keys,
preserve_fn(self.store_keys)(
server_name=key_server_name,
from_server=server_name,
verify_keys=verify_keys,
@@ -622,8 +615,7 @@ class Keyring(object):
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
run_in_background(
self.store.store_server_keys_json,
preserve_fn(self.store.store_server_keys_json)(
server_name=server_name,
key_id=key_id,
from_server=server_name,
@@ -724,8 +716,7 @@ class Keyring(object):
# TODO(markjh): Store whether the keys have expired.
return logcontext.make_deferred_yieldable(defer.gatherResults(
[
run_in_background(
self.store.store_server_verify_key,
preserve_fn(self.store.store_server_verify_key)(
server_name, server_name, key.time_added, key
)
for key_id, key in verify_keys.items()
@@ -743,7 +734,7 @@ def _handle_key_deferred(verify_request):
except IOError as e:
logger.warn(
"Got IOError when downloading keys for %s: %s %s",
server_name, type(e).__name__, str(e),
server_name, type(e).__name__, str(e.message),
)
raise SynapseError(
502,
@@ -753,7 +744,7 @@ def _handle_key_deferred(verify_request):
except Exception as e:
logger.exception(
"Got Exception when downloading keys for %s: %s %s",
server_name, type(e).__name__, str(e),
server_name, type(e).__name__, str(e.message),
)
raise SynapseError(
401,

View File

@@ -47,26 +47,14 @@ class _EventInternalMetadata(object):
def _event_dict_property(key):
# We want to be able to use hasattr with the event dict properties.
# However, (on python3) hasattr expects AttributeError to be raised. Hence,
# we need to transform the KeyError into an AttributeError
def getter(self):
try:
return self._event_dict[key]
except KeyError:
raise AttributeError(key)
return self._event_dict[key]
def setter(self, v):
try:
self._event_dict[key] = v
except KeyError:
raise AttributeError(key)
self._event_dict[key] = v
def delete(self):
try:
del self._event_dict[key]
except KeyError:
raise AttributeError(key)
del self._event_dict[key]
return property(
getter,

View File

@@ -14,10 +14,7 @@
# limitations under the License.
import logging
import six
from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import SynapseError, Codes
from synapse.api.errors import SynapseError
from synapse.crypto.event_signing import check_event_content_hash
from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
@@ -193,23 +190,11 @@ def event_from_pdu_json(pdu_json, outlier=False):
FrozenEvent
Raises:
SynapseError: if the pdu is missing required fields or is otherwise
not a valid matrix event
SynapseError: if the pdu is missing required fields
"""
# we could probably enforce a bunch of other fields here (room_id, sender,
# origin, etc etc)
assert_params_in_request(pdu_json, ('event_id', 'type', 'depth'))
depth = pdu_json['depth']
if not isinstance(depth, six.integer_types):
raise SynapseError(400, "Depth %r not an intger" % (depth, ),
Codes.BAD_JSON)
if depth < 0:
raise SynapseError(400, "Depth too small", Codes.BAD_JSON)
elif depth > MAX_DEPTH:
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
assert_params_in_request(pdu_json, ('event_id', 'type'))
event = FrozenEvent(
pdu_json
)

View File

@@ -19,8 +19,6 @@ import itertools
import logging
import random
from six.moves import range
from twisted.internet import defer
from synapse.api.constants import Membership
@@ -35,7 +33,7 @@ from synapse.federation.federation_base import (
import synapse.metrics
from synapse.util import logcontext, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
@@ -396,7 +394,7 @@ class FederationClient(FederationBase):
seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
signed_events = seen_events.values()
else:
seen_events = yield self.store.have_seen_events(event_ids)
seen_events = yield self.store.have_events(event_ids)
signed_events = []
failed_to_fetch = set()
@@ -415,12 +413,11 @@ class FederationClient(FederationBase):
batch_size = 20
missing_events = list(missing_events)
for i in range(0, len(missing_events), batch_size):
for i in xrange(0, len(missing_events), batch_size):
batch = set(missing_events[i:i + batch_size])
deferreds = [
run_in_background(
self.get_pdu,
preserve_fn(self.get_pdu)(
destinations=random_server_list(),
event_id=e_id,
)

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -31,10 +30,9 @@ import synapse.metrics
from synapse.types import get_domain_from_id
from synapse.util import async
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logutils import log_function
from six import iteritems
# when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit.
TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -214,17 +212,16 @@ class FederationServer(FederationBase):
if not in_room:
raise AuthError(403, "Host not in room.")
# we grab the linearizer to protect ourselves from servers which hammer
# us. In theory we might already have the response to this query
# in the cache so we could return it without waiting for the linearizer
# - but that's non-trivial to get right, and anyway somewhat defeats
# the point of the linearizer.
with (yield self._server_linearizer.queue((origin, room_id))):
resp = yield self._state_resp_cache.wrap(
(room_id, event_id),
self._on_context_state_request_compute,
room_id, event_id,
)
result = self._state_resp_cache.get((room_id, event_id))
if not result:
with (yield self._server_linearizer.queue((origin, room_id))):
d = self._state_resp_cache.set(
(room_id, event_id),
preserve_fn(self._on_context_state_request_compute)(room_id, event_id)
)
resp = yield make_deferred_yieldable(d)
else:
resp = yield make_deferred_yieldable(result)
defer.returnValue((200, resp))
@@ -428,9 +425,9 @@ class FederationServer(FederationBase):
"Claimed one-time-keys: %s",
",".join((
"%s for %s:%s" % (key_id, user_id, device_id)
for user_id, user_keys in iteritems(json_result)
for device_id, device_keys in iteritems(user_keys)
for key_id, _ in iteritems(device_keys)
for user_id, user_keys in json_result.iteritems()
for device_id, device_keys in user_keys.iteritems()
for key_id, _ in device_keys.iteritems()
)),
)
@@ -497,33 +494,13 @@ class FederationServer(FederationBase):
def _handle_received_pdu(self, origin, pdu):
""" Process a PDU received in a federation /send/ transaction.
If the event is invalid, then this method throws a FederationError.
(The error will then be logged and sent back to the sender (which
probably won't do anything with it), and other events in the
transaction will be processed as normal).
It is likely that we'll then receive other events which refer to
this rejected_event in their prev_events, etc. When that happens,
we'll attempt to fetch the rejected event again, which will presumably
fail, so those second-generation events will also get rejected.
Eventually, we get to the point where there are more than 10 events
between any new events and the original rejected event. Since we
only try to backfill 10 events deep on received pdu, we then accept the
new event, possibly introducing a discontinuity in the DAG, with new
forward extremities, so normal service is approximately returned,
until we try to backfill across the discontinuity.
Args:
origin (str): server which sent the pdu
pdu (FrozenEvent): received pdu
Returns (Deferred): completes with None
Raises: FederationError if the signatures / hash do not match, or
if the event was unacceptable for any other reason (eg, too large,
too many prev_events, couldn't find the prev_events)
"""
Raises: FederationError if the signatures / hash do not match
"""
# check that it's actually being sent from a valid destination to
# workaround bug #1753 in 0.18.5 and 0.18.6
if origin != get_domain_from_id(pdu.event_id):

View File

@@ -40,8 +40,6 @@ from collections import namedtuple
import logging
from six import itervalues, iteritems
logger = logging.getLogger(__name__)
@@ -124,7 +122,7 @@ class FederationRemoteSendQueue(object):
user_ids = set(
user_id
for uids in itervalues(self.presence_changed)
for uids in self.presence_changed.itervalues()
for user_id in uids
)
@@ -278,7 +276,7 @@ class FederationRemoteSendQueue(object):
# stream position.
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
for ((destination, edu_key), pos) in iteritems(keyed_edus):
for ((destination, edu_key), pos) in keyed_edus.iteritems():
rows.append((pos, KeyedEduRow(
key=edu_key,
edu=self.keyed_edu[(destination, edu_key)],
@@ -311,7 +309,7 @@ class FederationRemoteSendQueue(object):
j = keys.bisect_right(to_token) + 1
device_messages = {self.device_messages[k]: k for k in keys[i:j]}
for (destination, pos) in iteritems(device_messages):
for (destination, pos) in device_messages.iteritems():
rows.append((pos, DeviceRow(
destination=destination,
)))
@@ -530,19 +528,19 @@ def process_rows_for_federation(transaction_queue, rows):
if buff.presence:
transaction_queue.send_presence(buff.presence)
for destination, edu_map in iteritems(buff.keyed_edus):
for destination, edu_map in buff.keyed_edus.iteritems():
for key, edu in edu_map.items():
transaction_queue.send_edu(
edu.destination, edu.edu_type, edu.content, key=key,
)
for destination, edu_list in iteritems(buff.edus):
for destination, edu_list in buff.edus.iteritems():
for edu in edu_list:
transaction_queue.send_edu(
edu.destination, edu.edu_type, edu.content, key=None,
)
for destination, failure_list in iteritems(buff.failures):
for destination, failure_list in buff.failures.iteritems():
for failure in failure_list:
transaction_queue.send_failure(destination, failure)

View File

@@ -295,6 +295,7 @@ class TransactionQueue(object):
Args:
states (list(UserPresenceState))
"""
return
# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
@@ -323,8 +324,6 @@ class TransactionQueue(object):
break
yield self._process_presence_inner(states_map.values())
except Exception:
logger.exception("Error sending presence states to servers")
finally:
self._processing_pending_presence = False

View File

@@ -25,7 +25,7 @@ from synapse.http.servlet import (
)
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
from synapse.util.logcontext import run_in_background
from synapse.util.logcontext import preserve_fn
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
import functools
@@ -94,6 +94,12 @@ class Authenticator(object):
"signatures": {},
}
if (
self.federation_domain_whitelist is not None and
self.server_name not in self.federation_domain_whitelist
):
raise FederationDeniedError(self.server_name)
if content is not None:
json_request["content"] = content
@@ -132,12 +138,6 @@ class Authenticator(object):
json_request["origin"] = origin
json_request["signatures"].setdefault(origin, {})[key] = sig
if (
self.federation_domain_whitelist is not None and
origin not in self.federation_domain_whitelist
):
raise FederationDeniedError(origin)
if not json_request["signatures"]:
raise NoAuthenticationError(
401, "Missing Authorization headers", Codes.UNAUTHORIZED,
@@ -152,18 +152,11 @@ class Authenticator(object):
# alive
retry_timings = yield self.store.get_destination_retry_timings(origin)
if retry_timings and retry_timings["retry_last_ts"]:
run_in_background(self._reset_retry_timings, origin)
logger.info("Marking origin %r as up", origin)
preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0)
defer.returnValue(origin)
@defer.inlineCallbacks
def _reset_retry_timings(self, origin):
try:
logger.info("Marking origin %r as up", origin)
yield self.store.set_destination_retry_timings(origin, 0, 0)
except Exception:
logger.exception("Error resetting retry timings on %s", origin)
class BaseFederationServlet(object):
REQUIRE_AUTH = True

View File

@@ -42,7 +42,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.types import get_domain_from_id
from synapse.util.logcontext import run_in_background
from synapse.util.logcontext import preserve_fn
from signedjson.sign import sign_json
@@ -165,35 +165,31 @@ class GroupAttestionRenewer(object):
@defer.inlineCallbacks
def _renew_attestation(group_id, user_id):
try:
if not self.is_mine_id(group_id):
destination = get_domain_from_id(group_id)
elif not self.is_mine_id(user_id):
destination = get_domain_from_id(user_id)
else:
logger.warn(
"Incorrectly trying to do attestations for user: %r in %r",
user_id, group_id,
)
yield self.store.remove_attestation_renewal(group_id, user_id)
return
attestation = self.attestations.create_attestation(group_id, user_id)
yield self.transport_client.renew_group_attestation(
destination, group_id, user_id,
content={"attestation": attestation},
if not self.is_mine_id(group_id):
destination = get_domain_from_id(group_id)
elif not self.is_mine_id(user_id):
destination = get_domain_from_id(user_id)
else:
logger.warn(
"Incorrectly trying to do attestations for user: %r in %r",
user_id, group_id,
)
yield self.store.remove_attestation_renewal(group_id, user_id)
return
yield self.store.update_attestation_renewal(
group_id, user_id, attestation
)
except Exception:
logger.exception("Error renewing attestation of %r in %r",
user_id, group_id)
attestation = self.attestations.create_attestation(group_id, user_id)
yield self.transport_client.renew_group_attestation(
destination, group_id, user_id,
content={"attestation": attestation},
)
yield self.store.update_attestation_renewal(
group_id, user_id, attestation
)
for row in rows:
group_id = row["group_id"]
user_id = row["user_id"]
run_in_background(_renew_attestation, group_id, user_id)
preserve_fn(_renew_attestation)(group_id, user_id)

View File

@@ -19,7 +19,7 @@ import synapse
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
from synapse.util.logcontext import (
make_deferred_yieldable, run_in_background,
make_deferred_yieldable, preserve_fn, run_in_background,
)
import logging
@@ -111,7 +111,9 @@ class ApplicationServicesHandler(object):
# Fork off pushes to these services
for service in services:
self.scheduler.submit_event_for_as(service, event)
preserve_fn(self.scheduler.submit_event_for_as)(
service, event
)
@defer.inlineCallbacks
def handle_room_events(events):
@@ -196,10 +198,7 @@ class ApplicationServicesHandler(object):
services = yield self._get_services_for_3pn(protocol)
results = yield make_deferred_yieldable(defer.DeferredList([
run_in_background(
self.appservice_api.query_3pe,
service, kind, protocol, fields,
)
preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
for service in services
], consumeErrors=True))
@@ -260,15 +259,11 @@ class ApplicationServicesHandler(object):
event based on the service regex.
"""
services = self.store.get_app_services()
# we can't use a list comprehension here. Since python 3, list
# comprehensions use a generator internally. This means you can't yield
# inside of a list comprehension anymore.
interested_list = []
for s in services:
if (yield s.is_interested(event, self.store)):
interested_list.append(s)
interested_list = [
s for s in services if (
yield s.is_interested(event, self.store)
)
]
defer.returnValue(interested_list)
def _get_services_for_user(self, user_id):

View File

@@ -24,7 +24,7 @@ from synapse.api.errors import (
SynapseError, CodeMessageException, FederationDeniedError,
)
from synapse.types import get_domain_from_id, UserID
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__)
@@ -139,9 +139,9 @@ class E2eKeysHandler(object):
failures[destination] = _exception_to_failure(e)
yield make_deferred_yieldable(defer.gatherResults([
run_in_background(do_remote_query, destination)
preserve_fn(do_remote_query)(destination)
for destination in remote_queries_not_in_cache
], consumeErrors=True))
]))
defer.returnValue({
"device_keys": results, "failures": failures,
@@ -242,9 +242,9 @@ class E2eKeysHandler(object):
failures[destination] = _exception_to_failure(e)
yield make_deferred_yieldable(defer.gatherResults([
run_in_background(claim_client_keys, destination)
preserve_fn(claim_client_keys)(destination)
for destination in remote_queries
], consumeErrors=True))
]))
logger.info(
"Claimed one-time-keys: %s",

View File

@@ -15,16 +15,8 @@
# limitations under the License.
"""Contains handlers for federation events."""
import itertools
import logging
import sys
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
import six
from six.moves import http_client
from twisted.internet import defer
from unpaddedbase64 import decode_base64
from ._base import BaseHandler
@@ -51,6 +43,10 @@ from synapse.util.retryutils import NotRetryingDestination
from synapse.util.distributor import user_joined_room
from twisted.internet import defer
import itertools
import logging
logger = logging.getLogger(__name__)
@@ -119,19 +115,6 @@ class FederationHandler(BaseHandler):
logger.debug("Already seen pdu %s", pdu.event_id)
return
# do some initial sanity-checking of the event. In particular, make
# sure it doesn't have hundreds of prev_events or auth_events, which
# could cause a huge state resolution or cascade of event fetches.
try:
self._sanity_check_event(pdu)
except SynapseError as err:
raise FederationError(
"ERROR",
err.code,
err.msg,
affected=pdu.event_id,
)
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if pdu.room_id in self.room_queues:
@@ -166,6 +149,10 @@ class FederationHandler(BaseHandler):
auth_chain = []
have_seen = yield self.store.have_events(
[ev for ev, _ in pdu.prev_events]
)
fetch_state = False
# Get missing pdus if necessary.
@@ -181,7 +168,7 @@ class FederationHandler(BaseHandler):
)
prevs = {e_id for e_id, _ in pdu.prev_events}
seen = yield self.store.have_seen_events(prevs)
seen = set(have_seen.keys())
if min_depth and pdu.depth < min_depth:
# This is so that we don't notify the user about this
@@ -209,7 +196,8 @@ class FederationHandler(BaseHandler):
# Update the set of things we've seen after trying to
# fetch the missing stuff
seen = yield self.store.have_seen_events(prevs)
have_seen = yield self.store.have_events(prevs)
seen = set(have_seen.iterkeys())
if not prevs - seen:
logger.info(
@@ -260,7 +248,8 @@ class FederationHandler(BaseHandler):
min_depth (int): Minimum depth of events to return.
"""
# We recalculate seen, since it may have changed.
seen = yield self.store.have_seen_events(prevs)
have_seen = yield self.store.have_events(prevs)
seen = set(have_seen.keys())
if not prevs - seen:
return
@@ -372,7 +361,9 @@ class FederationHandler(BaseHandler):
if auth_chain:
event_ids |= {e.event_id for e in auth_chain}
seen_ids = yield self.store.have_seen_events(event_ids)
seen_ids = set(
(yield self.store.have_events(event_ids)).keys()
)
if state and auth_chain is not None:
# If we have any state or auth_chain given to us by the replication
@@ -536,16 +527,9 @@ class FederationHandler(BaseHandler):
def backfill(self, dest, room_id, limit, extremities):
""" Trigger a backfill request to `dest` for the given `room_id`
This will attempt to get more events from the remote. If the other side
has no new events to offer, this will return an empty list.
As the events are received, we check their signatures, and also do some
sanity-checking on them. If any of the backfilled events are invalid,
this method throws a SynapseError.
TODO: make this more useful to distinguish failures of the remote
server from invalid events (there is probably no point in trying to
re-fetch invalid events from every other HS in the room.)
This will attempt to get more events from the remote. This may return
be successfull and still return no events if the other side has no new
events to offer.
"""
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
@@ -557,16 +541,6 @@ class FederationHandler(BaseHandler):
extremities=extremities,
)
# ideally we'd sanity check the events here for excess prev_events etc,
# but it's hard to reject events at this point without completely
# breaking backfill in the same way that it is currently broken by
# events whose signature we cannot verify (#3121).
#
# So for now we accept the events anyway. #3124 tracks this.
#
# for ev in events:
# self._sanity_check_event(ev)
# Don't bother processing events we already have.
seen_events = yield self.store.have_events_in_timeline(
set(e.event_id for e in events)
@@ -639,8 +613,7 @@ class FederationHandler(BaseHandler):
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(
self.replication_layer.get_pdu,
logcontext.preserve_fn(self.replication_layer.get_pdu)(
[dest],
event_id,
outlier=True,
@@ -660,7 +633,7 @@ class FederationHandler(BaseHandler):
failed_to_fetch = missing_auth - set(auth_events)
seen_events = yield self.store.have_seen_events(
seen_events = yield self.store.have_events(
set(auth_events.keys()) | set(state_events.keys())
)
@@ -870,38 +843,6 @@ class FederationHandler(BaseHandler):
defer.returnValue(False)
def _sanity_check_event(self, ev):
"""
Do some early sanity checks of a received event
In particular, checks it doesn't have an excessive number of
prev_events or auth_events, which could cause a huge state resolution
or cascade of event fetches.
Args:
ev (synapse.events.EventBase): event to be checked
Returns: None
Raises:
SynapseError if the event does not pass muster
"""
if len(ev.prev_events) > 20:
logger.warn("Rejecting event %s which has %i prev_events",
ev.event_id, len(ev.prev_events))
raise SynapseError(
http_client.BAD_REQUEST,
"Too many prev_events",
)
if len(ev.auth_events) > 10:
logger.warn("Rejecting event %s which has %i auth_events",
ev.event_id, len(ev.auth_events))
raise SynapseError(
http_client.BAD_REQUEST,
"Too many auth_events",
)
@defer.inlineCallbacks
def send_invite(self, target_host, event):
""" Sends the invite to the remote server for signing.
@@ -1026,7 +967,7 @@ class FederationHandler(BaseHandler):
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it.
logcontext.run_in_background(self._handle_queued_pdus, room_queue)
logcontext.preserve_fn(self._handle_queued_pdus)(room_queue)
defer.returnValue(True)
@@ -1516,21 +1457,18 @@ class FederationHandler(BaseHandler):
backfilled=backfilled,
)
except: # noqa: E722, as we reraise the exception this is fine.
tp, value, tb = sys.exc_info()
logcontext.run_in_background(
self.store.remove_push_actions_from_staging,
event.event_id,
)
six.reraise(tp, value, tb)
# Ensure that we actually remove the entries in the push actions
# staging area
logcontext.preserve_fn(
self.store.remove_push_actions_from_staging
)(event.event_id)
raise
if not backfilled:
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
logcontext.run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id,
logcontext.preserve_fn(self.pusher_pool.on_new_notifications)(
event_stream_id, max_stream_id
)
defer.returnValue((context, event_stream_id, max_stream_id))
@@ -1544,8 +1482,7 @@ class FederationHandler(BaseHandler):
"""
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(
self._prep_event,
logcontext.preserve_fn(self._prep_event)(
origin,
ev_info["event"],
state=ev_info.get("state"),
@@ -1799,8 +1736,7 @@ class FederationHandler(BaseHandler):
event_key = None
if event_auth_events - current_state:
# TODO: can we use store.have_seen_events here instead?
have_events = yield self.store.get_seen_events_with_rejections(
have_events = yield self.store.have_events(
event_auth_events - current_state
)
else:
@@ -1823,12 +1759,12 @@ class FederationHandler(BaseHandler):
origin, event.room_id, event.event_id
)
seen_remotes = yield self.store.have_seen_events(
seen_remotes = yield self.store.have_events(
[e.event_id for e in remote_auth_chain]
)
for e in remote_auth_chain:
if e.event_id in seen_remotes:
if e.event_id in seen_remotes.keys():
continue
if e.event_id == event.event_id:
@@ -1855,7 +1791,7 @@ class FederationHandler(BaseHandler):
except AuthError:
pass
have_events = yield self.store.get_seen_events_with_rejections(
have_events = yield self.store.have_events(
[e_id for e_id, _ in event.auth_events]
)
seen_events = set(have_events.keys())
@@ -1874,8 +1810,7 @@ class FederationHandler(BaseHandler):
different_events = yield logcontext.make_deferred_yieldable(
defer.gatherResults([
logcontext.run_in_background(
self.store.get_event,
logcontext.preserve_fn(self.store.get_event)(
d,
allow_none=True,
allow_rejected=False,
@@ -1941,13 +1876,13 @@ class FederationHandler(BaseHandler):
local_auth_chain,
)
seen_remotes = yield self.store.have_seen_events(
seen_remotes = yield self.store.have_events(
[e.event_id for e in result["auth_chain"]]
)
# 3. Process any remote auth chain events we haven't seen.
for ev in result["auth_chain"]:
if ev.event_id in seen_remotes:
if ev.event_id in seen_remotes.keys():
continue
if ev.event_id == event.event_id:

View File

@@ -27,7 +27,7 @@ from synapse.types import (
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -166,8 +166,7 @@ class InitialSyncHandler(BaseHandler):
(messages, token), current_state = yield make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self.store.get_recent_events_for_room,
preserve_fn(self.store.get_recent_events_for_room)(
event.room_id,
limit=limit,
end_token=room_end_token,
@@ -373,6 +372,7 @@ class InitialSyncHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
defer.returnValue([])
states = yield presence_handler.get_states(
[m.user_id for m in room_members],
as_event=True,
@@ -392,10 +392,9 @@ class InitialSyncHandler(BaseHandler):
presence, receipts, (messages, token) = yield defer.gatherResults(
[
run_in_background(get_presence),
run_in_background(get_receipts),
run_in_background(
self.store.get_recent_events_for_room,
preserve_fn(get_presence)(),
preserve_fn(get_receipts)(),
preserve_fn(self.store.get_recent_events_for_room)(
room_id,
limit=limit,
end_token=now_token.room_key,

View File

@@ -13,16 +13,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import simplejson
import sys
from canonicaljson import encode_canonical_json
import six
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership, MAX_DEPTH
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
@@ -31,7 +25,7 @@ from synapse.types import (
UserID, RoomAlias, RoomStreamToken,
)
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
from synapse.util.logcontext import run_in_background
from synapse.util.logcontext import preserve_fn, run_in_background
from synapse.util.metrics import measure_func
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.stringutils import random_string
@@ -40,6 +34,12 @@ from synapse.replication.http.send_event import send_event_to_master
from ._base import BaseHandler
from canonicaljson import encode_canonical_json
import logging
import random
import simplejson
logger = logging.getLogger(__name__)
@@ -384,7 +384,7 @@ class MessageHandler(BaseHandler):
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or becuase there
# is a user in the room that the AS is "interested in"
if requester.app_service and user_id not in users_with_profile:
if False and requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break
@@ -433,7 +433,7 @@ class EventCreationHandler(object):
@defer.inlineCallbacks
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
prev_events_and_hashes=None):
prev_event_ids=None):
"""
Given a dict from a client, create a new event.
@@ -447,13 +447,7 @@ class EventCreationHandler(object):
event_dict (dict): An entire event
token_id (str)
txn_id (str)
prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
the forward extremities to use as the prev_events for the
new event. For each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
prev_event_ids (list): The prev event ids to use when creating the event
Returns:
Tuple of created event (FrozenEvent), Context
@@ -491,7 +485,7 @@ class EventCreationHandler(object):
event, context = yield self.create_new_client_event(
builder=builder,
requester=requester,
prev_events_and_hashes=prev_events_and_hashes,
prev_event_ids=prev_event_ids,
)
defer.returnValue((event, context))
@@ -594,48 +588,39 @@ class EventCreationHandler(object):
@measure_func("create_new_client_event")
@defer.inlineCallbacks
def create_new_client_event(self, builder, requester=None,
prev_events_and_hashes=None):
"""Create a new event for a local client
Args:
builder (EventBuilder):
requester (synapse.types.Requester|None):
prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
the forward extremities to use as the prev_events for the
new event. For each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
If None, they will be requested from the database.
Returns:
Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
"""
if prev_events_and_hashes is not None:
assert len(prev_events_and_hashes) <= 10, \
"Attempting to create an event with %i prev_events" % (
len(prev_events_and_hashes),
def create_new_client_event(self, builder, requester=None, prev_event_ids=None):
if prev_event_ids:
prev_events = yield self.store.add_event_hashes(prev_event_ids)
prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
depth = prev_max_depth + 1
else:
latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
builder.room_id,
)
else:
prev_events_and_hashes = \
yield self.store.get_prev_events_for_room(builder.room_id)
if prev_events_and_hashes:
depth = max([d for _, _, d in prev_events_and_hashes]) + 1
# we cap depth of generated events, to ensure that they are not
# rejected by other servers (and so that they can be persisted in
# the db)
depth = min(depth, MAX_DEPTH)
else:
depth = 1
# We want to limit the max number of prev events we point to in our
# new event
if len(latest_ret) > 10:
# Sort by reverse depth, so we point to the most recent.
latest_ret.sort(key=lambda a: -a[2])
new_latest_ret = latest_ret[:5]
prev_events = [
(event_id, prev_hashes)
for event_id, prev_hashes, _ in prev_events_and_hashes
]
# We also randomly point to some of the older events, to make
# sure that we don't completely ignore the older events.
if latest_ret[5:]:
sample_size = min(5, len(latest_ret[5:]))
new_latest_ret.extend(random.sample(latest_ret[5:], sample_size))
latest_ret = new_latest_ret
if latest_ret:
depth = max([d for _, _, d in latest_ret]) + 1
else:
depth = 1
prev_events = [
(event_id, prev_hashes)
for event_id, prev_hashes, _ in latest_ret
]
builder.prev_events = prev_events
builder.depth = depth
@@ -734,14 +719,8 @@ class EventCreationHandler(object):
except: # noqa: E722, as we reraise the exception this is fine.
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
tp, value, tb = sys.exc_info()
run_in_background(
self.store.remove_push_actions_from_staging,
event.event_id,
)
six.reraise(tp, value, tb)
preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id)
raise
@defer.inlineCallbacks
def persist_and_notify_client_event(
@@ -861,33 +840,22 @@ class EventCreationHandler(object):
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
run_in_background(
self.pusher_pool.on_new_notifications,
preserve_fn(self.pusher_pool.on_new_notifications)(
event_stream_id, max_stream_id
)
@defer.inlineCallbacks
def _notify():
yield run_on_reactor()
try:
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
except Exception:
logger.exception("Error notifying about new room event")
self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
run_in_background(_notify)
preserve_fn(_notify)()
if event.type == EventTypes.Message:
presence = self.hs.get_presence_handler()
# We don't want to block sending messages on any presence code. This
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)
@defer.inlineCallbacks
def _bump_active_time(self, user):
try:
presence = self.hs.get_presence_handler()
yield presence.bump_presence_active_time(user)
except Exception:
logger.exception("Error bumping presence active time")
preserve_fn(presence.bump_presence_active_time)(requester.user)

View File

@@ -31,7 +31,7 @@ from synapse.storage.presence import UserPresenceState
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.async import Linearizer
from synapse.util.logcontext import run_in_background
from synapse.util.logcontext import preserve_fn
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@@ -254,14 +254,6 @@ class PresenceHandler(object):
logger.info("Finished _persist_unpersisted_changes")
@defer.inlineCallbacks
def _update_states_and_catch_exception(self, new_states):
try:
res = yield self._update_states(new_states)
defer.returnValue(res)
except Exception:
logger.exception("Error updating presence")
@defer.inlineCallbacks
def _update_states(self, new_states):
"""Updates presence of users. Sets the appropriate timeouts. Pokes
@@ -372,7 +364,7 @@ class PresenceHandler(object):
now=now,
)
run_in_background(self._update_states_and_catch_exception, changes)
preserve_fn(self._update_states)(changes)
except Exception:
logger.exception("Exception in _handle_timeouts loop")
@@ -381,6 +373,7 @@ class PresenceHandler(object):
"""We've seen the user do something that indicates they're interacting
with the app.
"""
return
user_id = user.to_string()
bump_active_time_counter.inc()
@@ -410,6 +403,7 @@ class PresenceHandler(object):
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
affect_presence = False
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
@@ -430,23 +424,20 @@ class PresenceHandler(object):
@defer.inlineCallbacks
def _end():
try:
if affect_presence:
self.user_to_num_current_syncs[user_id] -= 1
prev_state = yield self.current_state_for_user(user_id)
yield self._update_states([prev_state.copy_and_replace(
last_user_sync_ts=self.clock.time_msec(),
)])
except Exception:
logger.exception("Error updating presence after sync")
@contextmanager
def _user_syncing():
try:
yield
finally:
if affect_presence:
run_in_background(_end)
preserve_fn(_end)()
defer.returnValue(_user_syncing())
@@ -455,6 +446,8 @@ class PresenceHandler(object):
Returns:
set(str): A set of user_id strings.
"""
# presence is disabled on matrix.org, so we return the empty set
return set()
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
@@ -474,6 +467,7 @@ class PresenceHandler(object):
syncing_user_ids(set(str)): The set of user_ids that are
currently syncing on that server.
"""
return
# Grab the previous list of user_ids that were syncing on that process
prev_syncing_user_ids = (

View File

@@ -135,40 +135,37 @@ class ReceiptsHandler(BaseHandler):
"""Given a list of receipts, works out which remote servers should be
poked and pokes them.
"""
try:
# TODO: Some of this stuff should be coallesced.
for receipt in receipts:
room_id = receipt["room_id"]
receipt_type = receipt["receipt_type"]
user_id = receipt["user_id"]
event_ids = receipt["event_ids"]
data = receipt["data"]
# TODO: Some of this stuff should be coallesced.
for receipt in receipts:
room_id = receipt["room_id"]
receipt_type = receipt["receipt_type"]
user_id = receipt["user_id"]
event_ids = receipt["event_ids"]
data = receipt["data"]
users = yield self.state.get_current_user_in_room(room_id)
remotedomains = set(get_domain_from_id(u) for u in users)
remotedomains = remotedomains.copy()
remotedomains.discard(self.server_name)
users = yield self.state.get_current_user_in_room(room_id)
remotedomains = set(get_domain_from_id(u) for u in users)
remotedomains = remotedomains.copy()
remotedomains.discard(self.server_name)
logger.debug("Sending receipt to: %r", remotedomains)
logger.debug("Sending receipt to: %r", remotedomains)
for domain in remotedomains:
self.federation.send_edu(
destination=domain,
edu_type="m.receipt",
content={
room_id: {
receipt_type: {
user_id: {
"event_ids": event_ids,
"data": data,
}
for domain in remotedomains:
self.federation.send_edu(
destination=domain,
edu_type="m.receipt",
content={
room_id: {
receipt_type: {
user_id: {
"event_ids": event_ids,
"data": data,
}
},
}
},
key=(room_id, receipt_type, user_id),
)
except Exception:
logger.exception("Error pushing receipts to remote servers")
},
key=(room_id, receipt_type, user_id),
)
@defer.inlineCallbacks
def get_receipts_for_room(self, room_id, to_key):

View File

@@ -15,13 +15,12 @@
from twisted.internet import defer
from six.moves import range
from ._base import BaseHandler
from synapse.api.constants import (
EventTypes, JoinRules,
)
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.async import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
@@ -45,7 +44,7 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache(hs, "room_list")
self.response_cache = ResponseCache(hs, "room_list", timeout_ms=10 * 60 * 1000)
self.remote_response_cache = ResponseCache(hs, "remote_room_list",
timeout_ms=30 * 1000)
@@ -79,11 +78,18 @@ class RoomListHandler(BaseHandler):
)
key = (limit, since_token, network_tuple)
return self.response_cache.wrap(
key,
self._get_public_room_list,
limit, since_token, network_tuple=network_tuple,
)
result = self.response_cache.get(key)
if not result:
logger.info("No cached result, calculating one.")
result = self.response_cache.set(
key,
preserve_fn(self._get_public_room_list)(
limit, since_token, network_tuple=network_tuple
)
)
else:
logger.info("Using cached deferred result.")
return make_deferred_yieldable(result)
@defer.inlineCallbacks
def _get_public_room_list(self, limit=None, since_token=None,
@@ -202,7 +208,7 @@ class RoomListHandler(BaseHandler):
step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
chunk = []
for i in range(0, len(rooms_to_scan), step):
for i in xrange(0, len(rooms_to_scan), step):
batch = rooms_to_scan[i:i + step]
logger.info("Processing %i rooms for result", len(batch))
yield concurrently_execute(
@@ -417,14 +423,18 @@ class RoomListHandler(BaseHandler):
server_name, limit, since_token, include_all_networks,
third_party_instance_id,
)
return self.remote_response_cache.wrap(
key,
repl_layer.get_public_rooms,
server_name, limit=limit, since_token=since_token,
search_filter=search_filter,
include_all_networks=include_all_networks,
third_party_instance_id=third_party_instance_id,
)
result = self.remote_response_cache.get(key)
if not result:
result = self.remote_response_cache.set(
key,
repl_layer.get_public_rooms(
server_name, limit=limit, since_token=since_token,
search_filter=search_filter,
include_all_networks=include_all_networks,
third_party_instance_id=third_party_instance_id,
)
)
return result
class RoomListNextBatch(namedtuple("RoomListNextBatch", (

View File

@@ -28,7 +28,7 @@ from synapse.api.constants import (
)
from synapse.api.errors import AuthError, SynapseError, Codes
from synapse.types import UserID, RoomID
from synapse.util.async import Linearizer
from synapse.util.async import Linearizer, Limiter
from synapse.util.distributor import user_left_room, user_joined_room
@@ -60,6 +60,7 @@ class RoomMemberHandler(object):
self.event_creation_hander = hs.get_event_creation_handler()
self.member_linearizer = Linearizer(name="member")
self.member_limiter = Limiter(20)
self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker()
@@ -149,7 +150,7 @@ class RoomMemberHandler(object):
@defer.inlineCallbacks
def _local_membership_update(
self, requester, target, room_id, membership,
prev_events_and_hashes,
prev_event_ids,
txn_id=None,
ratelimit=True,
content=None,
@@ -175,7 +176,7 @@ class RoomMemberHandler(object):
},
token_id=requester.access_token_id,
txn_id=txn_id,
prev_events_and_hashes=prev_events_and_hashes,
prev_event_ids=prev_event_ids,
)
# Check if this event matches the previous membership event for the user.
@@ -232,18 +233,23 @@ class RoomMemberHandler(object):
):
key = (room_id,)
with (yield self.member_linearizer.queue(key)):
result = yield self._update_membership(
requester,
target,
room_id,
action,
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
)
as_id = object()
if requester.app_service:
as_id = requester.app_service.id
with (yield self.member_limiter.queue(as_id)):
with (yield self.member_linearizer.queue(key)):
result = yield self._update_membership(
requester,
target,
room_id,
action,
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
)
defer.returnValue(result)
@@ -314,12 +320,7 @@ class RoomMemberHandler(object):
403, "Invites have been disabled on this server",
)
prev_events_and_hashes = yield self.store.get_prev_events_for_room(
room_id,
)
latest_event_ids = (
event_id for (event_id, _, _) in prev_events_and_hashes
)
latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
current_state_ids = yield self.state_handler.get_current_state_ids(
room_id, latest_event_ids=latest_event_ids,
)
@@ -408,7 +409,7 @@ class RoomMemberHandler(object):
membership=effective_membership_state,
txn_id=txn_id,
ratelimit=ratelimit,
prev_events_and_hashes=prev_events_and_hashes,
prev_event_ids=latest_event_ids,
content=content,
)
defer.returnValue(res)

View File

@@ -15,7 +15,7 @@
from synapse.api.constants import Membership, EventTypes
from synapse.util.async import concurrently_execute
from synapse.util.logcontext import LoggingContext
from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn
from synapse.util.metrics import Measure, measure_func
from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
@@ -30,6 +30,7 @@ import itertools
logger = logging.getLogger(__name__)
SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
@@ -52,7 +53,6 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [
to tell if room needs to be part of the sync result.
"""
return bool(self.events)
__bool__ = __nonzero__ # python3
class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
@@ -77,7 +77,6 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
# nb the notification count does not, er, count: if there's nothing
# else in the result, we don't need to send it.
)
__bool__ = __nonzero__ # python3
class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
@@ -97,7 +96,6 @@ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
or self.state
or self.account_data
)
__bool__ = __nonzero__ # python3
class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
@@ -109,7 +107,6 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
def __nonzero__(self):
"""Invited rooms should always be reported to the client"""
return True
__bool__ = __nonzero__ # python3
class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
@@ -121,7 +118,6 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
def __nonzero__(self):
return bool(self.join or self.invite or self.leave)
__bool__ = __nonzero__ # python3
class DeviceLists(collections.namedtuple("DeviceLists", [
@@ -132,7 +128,6 @@ class DeviceLists(collections.namedtuple("DeviceLists", [
def __nonzero__(self):
return bool(self.changed or self.left)
__bool__ = __nonzero__ # python3
class SyncResult(collections.namedtuple("SyncResult", [
@@ -165,7 +160,6 @@ class SyncResult(collections.namedtuple("SyncResult", [
self.device_lists or
self.groups
)
__bool__ = __nonzero__ # python3
class SyncHandler(object):
@@ -176,7 +170,10 @@ class SyncHandler(object):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.response_cache = ResponseCache(hs, "sync")
self.response_cache = ResponseCache(
hs, "sync",
timeout_ms=SYNC_RESPONSE_CACHE_MS,
)
self.state = hs.get_state_handler()
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
@@ -187,11 +184,15 @@ class SyncHandler(object):
Returns:
A Deferred SyncResult.
"""
return self.response_cache.wrap(
sync_config.request_key,
self._wait_for_sync_for_user,
sync_config, since_token, timeout, full_state,
)
result = self.response_cache.get(sync_config.request_key)
if not result:
result = self.response_cache.set(
sync_config.request_key,
preserve_fn(self._wait_for_sync_for_user)(
sync_config, since_token, timeout, full_state
)
)
return make_deferred_yieldable(result)
@defer.inlineCallbacks
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
@@ -602,7 +603,7 @@ class SyncHandler(object):
since_token is None and
sync_config.filter_collection.blocks_all_presence()
)
if not block_all_presence_data:
if False and not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users
)

View File

@@ -16,7 +16,7 @@
from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError
from synapse.util.logcontext import run_in_background
from synapse.util.logcontext import preserve_fn
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
from synapse.types import UserID, get_domain_from_id
@@ -97,8 +97,7 @@ class TypingHandler(object):
if self.hs.is_mine_id(member.user_id):
last_fed_poke = self._member_last_federation_poke.get(member, None)
if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
run_in_background(
self._push_remote,
preserve_fn(self._push_remote)(
member=member,
typing=True
)
@@ -197,7 +196,7 @@ class TypingHandler(object):
def _push_update(self, member, typing):
if self.hs.is_mine_id(member.user_id):
# Only send updates for changes to our own users.
run_in_background(self._push_remote, member, typing)
preserve_fn(self._push_remote)(member, typing)
self._push_update_local(
member=member,
@@ -206,31 +205,28 @@ class TypingHandler(object):
@defer.inlineCallbacks
def _push_remote(self, member, typing):
try:
users = yield self.state.get_current_user_in_room(member.room_id)
self._member_last_federation_poke[member] = self.clock.time_msec()
users = yield self.state.get_current_user_in_room(member.room_id)
self._member_last_federation_poke[member] = self.clock.time_msec()
now = self.clock.time_msec()
self.wheel_timer.insert(
now=now,
obj=member,
then=now + FEDERATION_PING_INTERVAL,
)
now = self.clock.time_msec()
self.wheel_timer.insert(
now=now,
obj=member,
then=now + FEDERATION_PING_INTERVAL,
)
for domain in set(get_domain_from_id(u) for u in users):
if domain != self.server_name:
self.federation.send_edu(
destination=domain,
edu_type="m.typing",
content={
"room_id": member.room_id,
"user_id": member.user_id,
"typing": typing,
},
key=member,
)
except Exception:
logger.exception("Error pushing typing notif to remotes")
for domain in set(get_domain_from_id(u) for u in users):
if domain != self.server_name:
self.federation.send_edu(
destination=domain,
edu_type="m.typing",
content={
"room_id": member.room_id,
"user_id": member.user_id,
"typing": typing,
},
key=member,
)
@defer.inlineCallbacks
def _recv_edu(self, origin, content):

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,24 +12,3 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet.defer import CancelledError
from twisted.python import failure
from synapse.api.errors import SynapseError
class RequestTimedOutError(SynapseError):
"""Exception representing timeout of an outbound request"""
def __init__(self):
super(RequestTimedOutError, self).__init__(504, "Timed out")
def cancelled_to_request_timed_out_error(value, timeout):
"""Turns CancelledErrors into RequestTimedOutErrors.
For use with async.add_timeout_to_deferred
"""
if isinstance(value, failure.Failure):
value.trap(CancelledError)
raise RequestTimedOutError()
return value

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,10 +18,9 @@ from OpenSSL.SSL import VERIFY_NONE
from synapse.api.errors import (
CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
)
from synapse.http import cancelled_to_request_timed_out_error
from synapse.util.async import add_timeout_to_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util import logcontext
import synapse.metrics
from synapse.http.endpoint import SpiderEndpoint
@@ -40,7 +38,7 @@ from twisted.web.http import PotentialDataLoss
from twisted.web.http_headers import Headers
from twisted.web._newclient import ResponseDone
from six import StringIO
from StringIO import StringIO
import simplejson as json
import logging
@@ -97,17 +95,21 @@ class SimpleHttpClient(object):
# counters to it
outgoing_requests_counter.inc(method)
logger.info("Sending request %s %s", method, uri)
try:
def send_request():
request_deferred = self.agent.request(
method, uri, *args, **kwargs
)
add_timeout_to_deferred(
return self.clock.time_bound_deferred(
request_deferred,
60, cancelled_to_request_timed_out_error,
time_out=60,
)
response = yield make_deferred_yieldable(request_deferred)
logger.info("Sending request %s %s", method, uri)
try:
with logcontext.PreserveLoggingContext():
response = yield send_request()
incoming_responses_counter.inc(method, response.code)
logger.info(
@@ -507,7 +509,7 @@ class SpiderHttpClient(SimpleHttpClient):
reactor,
SpiderEndpointFactory(hs)
)
), [(b'gzip', GzipDecoder)]
), [('gzip', GzipDecoder)]
)
# We could look like Chrome:
# self.user_agent = ("Mozilla/5.0 (%s) (KHTML, like Gecko)

View File

@@ -115,15 +115,10 @@ class _WrappedConnection(object):
if time.time() - self.last_request >= 2.5 * 60:
self.abort()
# Abort the underlying TLS connection. The abort() method calls
# loseConnection() on the TLS connection which tries to
# loseConnection() on the underlying TLS connection which tries to
# shutdown the connection cleanly. We call abortConnection()
# since that will promptly close the TLS connection.
#
# In Twisted >18.4; the TLS connection will be None if it has closed
# which will make abortConnection() throw. Check that the TLS connection
# is not None before trying to close it.
if self.transport.getHandle() is not None:
self.transport.abortConnection()
# since that will promptly close the underlying TCP connection.
self.transport.abortConnection()
def request(self, request):
self.last_request = time.time()
@@ -291,7 +286,7 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
if (len(answers) == 1
and answers[0].type == dns.SRV
and answers[0].payload
and answers[0].payload.target == dns.Name(b'.')):
and answers[0].payload.target == dns.Name('.')):
raise ConnectError("Service %s unavailable" % service_name)
for answer in answers:

View File

@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,19 +12,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import synapse.util.retryutils
from twisted.internet import defer, reactor, protocol
from twisted.internet.error import DNSLookupError
from twisted.web.client import readBody, HTTPConnectionPool, Agent
from twisted.web.http_headers import Headers
from twisted.web._newclient import ResponseDone
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
import synapse.metrics
from synapse.util.async import sleep, add_timeout_to_deferred
from synapse.util.async import sleep
from synapse.util import logcontext
from synapse.util.logcontext import make_deferred_yieldable
import synapse.util.retryutils
import synapse.metrics
from canonicaljson import encode_canonical_json
@@ -41,7 +38,8 @@ import logging
import random
import sys
import urllib
from six.moves.urllib import parse as urlparse
import urlparse
logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound")
@@ -186,20 +184,21 @@ class MatrixFederationHttpClient(object):
producer = body_callback(method, http_url_bytes, headers_dict)
try:
request_deferred = self.agent.request(
method,
url_bytes,
Headers(headers_dict),
producer
)
add_timeout_to_deferred(
request_deferred,
timeout / 1000. if timeout else 60,
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(
request_deferred,
)
def send_request():
request_deferred = self.agent.request(
method,
url_bytes,
Headers(headers_dict),
producer
)
return self.clock.time_bound_deferred(
request_deferred,
time_out=timeout / 1000. if timeout else 60,
)
with logcontext.PreserveLoggingContext():
response = yield send_request()
log_result = "%d %s" % (response.code, response.phrase,)
break

View File

@@ -546,6 +546,6 @@ def _request_user_agent_is_curl(request):
b"User-Agent", default=[]
)
for user_agent in user_agents:
if b"curl" in user_agent:
if "curl" in user_agent:
return True
return False

View File

@@ -16,7 +16,6 @@
from itertools import chain
import logging
import re
logger = logging.getLogger(__name__)
@@ -57,7 +56,8 @@ class BaseMetric(object):
return not len(self.labels)
def _render_labelvalue(self, value):
return '"%s"' % (_escape_label_value(value),)
# TODO: escape backslashes, quotes and newlines
return '"%s"' % (value)
def _render_key(self, values):
if self.is_scalar():
@@ -299,29 +299,3 @@ class MemoryUsageMetric(object):
"process_psutil_rss:total %d" % sum_rss,
"process_psutil_rss:count %d" % len_rss,
]
def _escape_character(m):
"""Replaces a single character with its escape sequence.
Args:
m (re.MatchObject): A match object whose first group is the single
character to replace
Returns:
str
"""
c = m.group(1)
if c == "\\":
return "\\\\"
elif c == "\"":
return "\\\""
elif c == "\n":
return "\\n"
return c
def _escape_label_value(value):
"""Takes a label value and escapes quotes, newlines and backslashes
"""
return re.sub(r"([\n\"\\])", _escape_character, value)

View File

@@ -14,17 +14,14 @@
# limitations under the License.
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
from synapse.util import DeferredTimedOutError
from synapse.util.logutils import log_function
from synapse.util.async import (
ObservableDeferred, add_timeout_to_deferred,
DeferredTimeoutError,
)
from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
from synapse.util.metrics import Measure
from synapse.types import StreamToken
from synapse.visibility import filter_events_for_client
@@ -147,7 +144,6 @@ class _NotifierUserStream(object):
class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
def __nonzero__(self):
return bool(self.events)
__bool__ = __nonzero__ # python3
class Notifier(object):
@@ -254,7 +250,9 @@ class Notifier(object):
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
run_in_background(self._notify_app_services, room_stream_id)
preserve_fn(self.appservice_handler.notify_interested_services)(
room_stream_id
)
if self.federation_sender:
self.federation_sender.notify_new_events(room_stream_id)
@@ -268,13 +266,6 @@ class Notifier(object):
rooms=[event.room_id],
)
@defer.inlineCallbacks
def _notify_app_services(self, room_stream_id):
try:
yield self.appservice_handler.notify_interested_services(room_stream_id)
except Exception:
logger.exception("Error notifying application services of event")
def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
""" Used to inform listeners that something has happend event wise.
@@ -339,12 +330,11 @@ class Notifier(object):
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
add_timeout_to_deferred(
listener.deferred,
(end_time - now) / 1000.,
)
with PreserveLoggingContext():
yield listener.deferred
yield self.clock.time_bound_deferred(
listener.deferred,
time_out=(end_time - now) / 1000.
)
current_token = user_stream.current_token
@@ -355,7 +345,7 @@ class Notifier(object):
# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
except DeferredTimeoutError:
except DeferredTimedOutError:
break
except defer.CancelledError:
break
@@ -560,14 +550,13 @@ class Notifier(object):
if end_time <= now:
break
add_timeout_to_deferred(
listener.deferred.addTimeout,
(end_time - now) / 1000.,
)
try:
with PreserveLoggingContext():
yield listener.deferred
except DeferredTimeoutError:
yield self.clock.time_bound_deferred(
listener.deferred,
time_out=(end_time - now) / 1000.
)
except DeferredTimedOutError:
break
except defer.CancelledError:
break

View File

@@ -77,13 +77,10 @@ class EmailPusher(object):
@defer.inlineCallbacks
def on_started(self):
if self.mailer is not None:
try:
self.throttle_params = yield self.store.get_throttle_params_by_room(
self.pusher_id
)
yield self._process()
except Exception:
logger.exception("Error starting email pusher")
self.throttle_params = yield self.store.get_throttle_params_by_room(
self.pusher_id
)
yield self._process()
def on_stop(self):
if self.timed_call:

View File

@@ -18,8 +18,8 @@ import logging
from twisted.internet import defer, reactor
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from . import push_rule_evaluator
from . import push_tools
import push_rule_evaluator
import push_tools
import synapse
from synapse.push import PusherConfigException
from synapse.util.logcontext import LoggingContext
@@ -94,10 +94,7 @@ class HttpPusher(object):
@defer.inlineCallbacks
def on_started(self):
try:
yield self._process()
except Exception:
logger.exception("Error starting http pusher")
yield self._process()
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):

View File

@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .httppusher import HttpPusher
from httppusher import HttpPusher
import logging
logger = logging.getLogger(__name__)

View File

@@ -14,13 +14,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.push.pusher import PusherFactory
from .pusher import PusherFactory
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.async import run_on_reactor
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
import logging
logger = logging.getLogger(__name__)
@@ -137,15 +137,12 @@ class PusherPool:
if u in self.pushers:
for p in self.pushers[u].values():
deferreds.append(
run_in_background(
p.on_new_notifications,
min_stream_id, max_stream_id,
preserve_fn(p.on_new_notifications)(
min_stream_id, max_stream_id
)
)
yield make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True),
)
yield make_deferred_yieldable(defer.gatherResults(deferreds))
except Exception:
logger.exception("Exception in pusher on_new_notifications")
@@ -167,15 +164,10 @@ class PusherPool:
if u in self.pushers:
for p in self.pushers[u].values():
deferreds.append(
run_in_background(
p.on_new_receipts,
min_stream_id, max_stream_id,
)
preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id)
)
yield make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True),
)
yield make_deferred_yieldable(defer.gatherResults(deferreds))
except Exception:
logger.exception("Exception in pusher on_new_receipts")
@@ -215,7 +207,7 @@ class PusherPool:
if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
run_in_background(p.on_started)
preserve_fn(p.on_started)()
logger.info("Started pushers")

View File

@@ -1,6 +1,5 @@
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,31 +18,16 @@ from distutils.version import LooseVersion
logger = logging.getLogger(__name__)
# this dict maps from python package name to a list of modules we expect it to
# provide.
#
# the key is a "requirement specifier", as used as a parameter to `pip
# install`[1], or an `install_requires` argument to `setuptools.setup` [2].
#
# the value is a sequence of strings; each entry should be the name of the
# python module, optionally followed by a version assertion which can be either
# ">=<ver>" or "==<ver>".
#
# [1] https://pip.pypa.io/en/stable/reference/pip_install/#requirement-specifiers.
# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
REQUIREMENTS = {
"jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
"frozendict>=0.4": ["frozendict"],
"unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"],
"canonicaljson>=1.1.3": ["canonicaljson>=1.1.3"],
"canonicaljson>=1.0.0": ["canonicaljson>=1.0.0"],
"signedjson>=1.0.0": ["signedjson>=1.0.0"],
"pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
"Twisted>=16.0.0": ["twisted>=16.0.0"],
# We use crypto.get_elliptic_curve which is only supported in >=0.15
"pyopenssl>=0.15": ["OpenSSL>=0.15"],
"pyopenssl>=0.14": ["OpenSSL>=0.14"],
"pyyaml": ["yaml"],
"pyasn1": ["pyasn1"],
"daemonize": ["daemonize"],
@@ -55,7 +39,6 @@ REQUIREMENTS = {
"pymacaroons-pynacl": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
"six": ["six"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {

View File

@@ -23,6 +23,7 @@ from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.util.async import sleep
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.metrics import Measure
from synapse.types import Requester, UserID
@@ -117,12 +118,17 @@ class ReplicationSendEventRestServlet(RestServlet):
self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
def on_PUT(self, request, event_id):
return self.response_cache.wrap(
event_id,
self._handle_request,
request
)
result = self.response_cache.get(event_id)
if not result:
result = self.response_cache.set(
event_id,
self._handle_request(request)
)
else:
logger.warn("Returning cached response")
return make_deferred_yieldable(result)
@preserve_fn
@defer.inlineCallbacks
def _handle_request(self, request):
with Measure(self.clock, "repl_send_event_parse"):

View File

@@ -42,6 +42,8 @@ class SlavedClientIpStore(BaseSlavedStore):
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return
self.client_ip_last_seen.prefill(key, now)
self.hs.get_tcp_replication().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now
)

View File

@@ -53,12 +53,12 @@ from twisted.internet import defer
from twisted.protocols.basic import LineOnlyReceiver
from twisted.python.failure import Failure
from .commands import (
from commands import (
COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS,
ErrorCommand, ServerCommand, RdataCommand, PositionCommand, PingCommand,
NameCommand, ReplicateCommand, UserSyncCommand, SyncCommand,
)
from .streams import STREAMS_MAP
from streams import STREAMS_MAP
from synapse.util.stringutils import random_string
from synapse.metrics.metric import CounterMetric

View File

@@ -18,8 +18,8 @@
from twisted.internet import defer, reactor
from twisted.internet.protocol import Factory
from .streams import STREAMS_MAP, FederationStream
from .protocol import ServerReplicationStreamProtocol
from streams import STREAMS_MAP, FederationStream
from protocol import ServerReplicationStreamProtocol
from synapse.util.metrics import Measure, measure_func

View File

@@ -33,7 +33,7 @@ import logging
logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 10000
MAX_EVENTS_BEHIND = 500000
EventStreamRow = namedtuple("EventStreamRow", (

View File

@@ -168,24 +168,11 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
yield self.store.find_first_stream_ordering_after_ts(ts)
)
room_event_after_stream_ordering = (
(_, depth, _) = (
yield self.store.get_room_event_after_stream_ordering(
room_id, stream_ordering,
)
)
if room_event_after_stream_ordering:
(_, depth, _) = room_event_after_stream_ordering
else:
logger.warn(
"[purge] purging events not possible: No event found "
"(received_ts %i => stream_ordering %i)",
ts, stream_ordering,
)
raise SynapseError(
404,
"there is no event to be purged",
errcode=Codes.NOT_FOUND,
)
logger.info(
"[purge] purging up to depth %i (received_ts %i => "
"stream_ordering %i)",
@@ -309,17 +296,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
)
new_room_id = info["room_id"]
yield self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
"type": "m.room.message",
"content": {"body": message, "msgtype": "m.text"},
"room_id": new_room_id,
"sender": new_room_user_id,
},
ratelimit=False,
)
requester_user_id = requester.user.to_string()
logger.info("Shutting down room %r", room_id)
@@ -357,6 +333,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
kicked_users.append(user_id)
yield self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
"type": "m.room.message",
"content": {"body": message, "msgtype": "m.text"},
"room_id": new_room_id,
"sender": new_room_user_id,
},
ratelimit=False,
)
aliases_for_room = yield self.store.get_aliases_for_room(room_id)
yield self.store.update_aliases_for_room(

View File

@@ -52,10 +52,6 @@ class ClientV1RestServlet(RestServlet):
"""A base Synapse REST Servlet for the client version 1 API.
"""
# This subclass was presumably created to allow the auth for the v1
# protocol version to be different, however this behaviour was removed.
# it may no longer be necessary
def __init__(self, hs):
"""
Args:
@@ -63,5 +59,5 @@ class ClientV1RestServlet(RestServlet):
"""
self.hs = hs
self.builder_factory = hs.get_event_builder_factory()
self.auth = hs.get_auth()
self.auth = hs.get_v1auth()
self.txns = HttpTransactionCache(hs.get_clock())

View File

@@ -25,7 +25,7 @@ from .base import ClientV1RestServlet, client_path_patterns
import simplejson as json
import urllib
from six.moves.urllib import parse as urlparse
import urlparse
import logging
from saml2 import BINDING_HTTP_POST

View File

@@ -81,7 +81,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
except Exception:
raise SynapseError(400, "Unable to parse state")
yield self.presence_handler.set_state(user, state)
# yield self.presence_handler.set_state(user, state)
defer.returnValue((200, {}))

View File

@@ -150,7 +150,7 @@ class PushersRemoveRestServlet(RestServlet):
super(RestServlet, self).__init__()
self.hs = hs
self.notifier = hs.get_notifier()
self.auth = hs.get_auth()
self.auth = hs.get_v1auth()
self.pusher_pool = self.hs.get_pusherpool()
@defer.inlineCallbacks

View File

@@ -30,8 +30,6 @@ from hashlib import sha1
import hmac
import logging
from six import string_types
logger = logging.getLogger(__name__)
@@ -335,11 +333,11 @@ class RegisterRestServlet(ClientV1RestServlet):
def _do_shared_secret(self, request, register_json, session):
yield run_on_reactor()
if not isinstance(register_json.get("mac", None), string_types):
if not isinstance(register_json.get("mac", None), basestring):
raise SynapseError(400, "Expected mac.")
if not isinstance(register_json.get("user", None), string_types):
if not isinstance(register_json.get("user", None), basestring):
raise SynapseError(400, "Expected 'user' key.")
if not isinstance(register_json.get("password", None), string_types):
if not isinstance(register_json.get("password", None), basestring):
raise SynapseError(400, "Expected 'password' key.")
if not self.hs.config.registration_shared_secret:
@@ -360,14 +358,14 @@ class RegisterRestServlet(ClientV1RestServlet):
got_mac = str(register_json["mac"])
want_mac = hmac.new(
key=self.hs.config.registration_shared_secret.encode(),
key=self.hs.config.registration_shared_secret,
digestmod=sha1,
)
want_mac.update(user)
want_mac.update(b"\x00")
want_mac.update("\x00")
want_mac.update(password)
want_mac.update(b"\x00")
want_mac.update(b"admin" if admin else b"notadmin")
want_mac.update("\x00")
want_mac.update("admin" if admin else "notadmin")
want_mac = want_mac.hexdigest()
if compare_digest(want_mac, got_mac):

View File

@@ -28,9 +28,8 @@ from synapse.http.servlet import (
parse_json_object_from_request, parse_string, parse_integer
)
from six.moves.urllib import parse as urlparse
import logging
import urllib
import simplejson as json
logger = logging.getLogger(__name__)
@@ -434,7 +433,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
as_client_event = "raw" not in request.args
filter_bytes = request.args.get("filter", None)
if filter_bytes:
filter_json = urlparse.unquote(filter_bytes[-1]).decode("UTF-8")
filter_json = urllib.unquote(filter_bytes[-1]).decode("UTF-8")
event_filter = Filter(json.loads(filter_json))
else:
event_filter = None
@@ -719,8 +718,8 @@ class RoomTypingRestServlet(ClientV1RestServlet):
def on_PUT(self, request, room_id, user_id):
requester = yield self.auth.get_user_by_req(request)
room_id = urlparse.unquote(room_id)
target_user = UserID.from_string(urlparse.unquote(user_id))
room_id = urllib.unquote(room_id)
target_user = UserID.from_string(urllib.unquote(user_id))
content = parse_json_object_from_request(request)

View File

@@ -35,8 +35,6 @@ from hashlib import sha1
from synapse.util.async import run_on_reactor
from synapse.util.ratelimitutils import FederationRateLimiter
from six import string_types
# We ought to be using hmac.compare_digest() but on older pythons it doesn't
# exist. It's a _really minor_ security flaw to use plain string comparison
@@ -212,14 +210,14 @@ class RegisterRestServlet(RestServlet):
# in sessions. Pull out the username/password provided to us.
desired_password = None
if 'password' in body:
if (not isinstance(body['password'], string_types) or
if (not isinstance(body['password'], basestring) or
len(body['password']) > 512):
raise SynapseError(400, "Invalid password")
desired_password = body["password"]
desired_username = None
if 'username' in body:
if (not isinstance(body['username'], string_types) or
if (not isinstance(body['username'], basestring) or
len(body['username']) > 512):
raise SynapseError(400, "Invalid username")
desired_username = body['username']
@@ -245,7 +243,7 @@ class RegisterRestServlet(RestServlet):
access_token = get_access_token_from_request(request)
if isinstance(desired_username, string_types):
if isinstance(desired_username, basestring):
result = yield self._do_appservice_registration(
desired_username, access_token, body
)
@@ -466,7 +464,7 @@ class RegisterRestServlet(RestServlet):
# includes the password and admin flag in the hashed text. Why are
# these different?
want_mac = hmac.new(
key=self.hs.config.registration_shared_secret.encode(),
key=self.hs.config.registration_shared_secret,
msg=user,
digestmod=sha1,
).hexdigest()

View File

@@ -28,7 +28,7 @@ import os
import logging
import urllib
from six.moves.urllib import parse as urlparse
import urlparse
logger = logging.getLogger(__name__)
@@ -143,7 +143,6 @@ def respond_with_responder(request, responder, media_type, file_size, upload_nam
respond_404(request)
return
logger.debug("Responding to media request with responder %s")
add_file_headers(request, media_type, file_size, upload_name)
with responder:
yield responder.write_to_consumer(request)

View File

@@ -47,7 +47,7 @@ import shutil
import cgi
import logging
from six.moves.urllib import parse as urlparse
import urlparse
logger = logging.getLogger(__name__)

View File

@@ -255,9 +255,7 @@ class FileResponder(Responder):
self.open_file = open_file
def write_to_consumer(self, consumer):
return make_deferred_yieldable(
FileSender().beginFileTransfer(self.open_file, consumer)
)
return FileSender().beginFileTransfer(self.open_file, consumer)
def __exit__(self, exc_type, exc_val, exc_tb):
self.open_file.close()

View File

@@ -35,7 +35,7 @@ from ._base import FileInfo
from synapse.api.errors import (
SynapseError, Codes,
)
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.stringutils import random_string
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.http.client import SpiderHttpClient
@@ -144,8 +144,7 @@ class PreviewUrlResource(Resource):
observable = self._cache.get(url)
if not observable:
download = run_in_background(
self._do_preview,
download = preserve_fn(self._do_preview)(
url, requester.user, ts,
)
observable = ObservableDeferred(

View File

@@ -18,7 +18,7 @@ from twisted.internet import defer, threads
from .media_storage import FileResponder
from synapse.config._base import Config
from synapse.util.logcontext import run_in_background
from synapse.util.logcontext import preserve_fn
import logging
import os
@@ -87,12 +87,7 @@ class StorageProviderWrapper(StorageProvider):
return self.backend.store_file(path, file_info)
else:
# TODO: Handle errors.
def store():
try:
return self.backend.store_file(path, file_info)
except Exception:
logger.exception("Error storing file")
run_in_background(store)
preserve_fn(self.backend.store_file)(path, file_info)
return defer.succeed(None)
def fetch(self, path, file_info):

View File

@@ -81,15 +81,15 @@ class UploadResource(Resource):
headers = request.requestHeaders
if headers.hasHeader("Content-Type"):
media_type = headers.getRawHeaders(b"Content-Type")[0]
media_type = headers.getRawHeaders("Content-Type")[0]
else:
raise SynapseError(
msg="Upload request missing 'Content-Type'",
code=400,
)
# if headers.hasHeader(b"Content-Disposition"):
# disposition = headers.getRawHeaders(b"Content-Disposition")[0]
# if headers.hasHeader("Content-Disposition"):
# disposition = headers.getRawHeaders("Content-Disposition")[0]
# TODO(markjh): parse content-dispostion
content_uri = yield self.media_repo.create_content(

View File

@@ -105,6 +105,7 @@ class HomeServer(object):
'federation_client',
'federation_server',
'handlers',
'v1auth',
'auth',
'state_handler',
'state_resolution_handler',
@@ -224,6 +225,15 @@ class HomeServer(object):
def build_simple_http_client(self):
return SimpleHttpClient(self)
def build_v1auth(self):
orf = Auth(self)
# Matrix spec makes no reference to what HTTP status code is returned,
# but the V1 API uses 403 where it means 401, and the webclient
# relies on this behaviour, so V1 gets its own copy of the auth
# with backwards compat behaviour.
orf.TOKEN_NOT_FOUND_HTTP_STATUS = 403
return orf
def build_state_handler(self):
return StateHandler(self)

View File

@@ -28,7 +28,7 @@ logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
LAST_SEEN_GRANULARITY = 120 * 1000
LAST_SEEN_GRANULARITY = 10 * 60 * 1000
class ClientIpStore(background_updates.BackgroundUpdateStore):

View File

@@ -15,7 +15,7 @@
from ._base import IncorrectDatabaseSetup
from .postgres import PostgresEngine
from .sqlite3_engine import Sqlite3Engine
from .sqlite3 import Sqlite3Engine
import importlib
import platform

View File

@@ -15,7 +15,6 @@
from synapse.storage.prepare_database import prepare_database
import sqlite3
import struct
import threading
@@ -26,11 +25,6 @@ class Sqlite3Engine(object):
def __init__(self, database_module, database_config):
self.module = database_module
if sqlite3.sqlite_version_info < (3, 15, 0):
raise RuntimeError(
"SQLite3 version is too old, Synapse requires 3.15 or later",
)
# The current max state_group, or None if we haven't looked
# in the DB yet.
self._current_state_group_id = None

View File

@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
from twisted.internet import defer
@@ -25,9 +24,7 @@ from synapse.util.caches.descriptors import cached
from unpaddedbase64 import encode_base64
import logging
from six.moves.queue import PriorityQueue, Empty
from six.moves import range
from Queue import PriorityQueue, Empty
logger = logging.getLogger(__name__)
@@ -81,7 +78,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
front_list = list(front)
chunks = [
front_list[x:x + 100]
for x in range(0, len(front), 100)
for x in xrange(0, len(front), 100)
]
for chunk in chunks:
txn.execute(
@@ -136,47 +133,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
retcol="event_id",
)
@defer.inlineCallbacks
def get_prev_events_for_room(self, room_id):
"""
Gets a subset of the current forward extremities in the given room.
Limits the result to 10 extremities, so that we can avoid creating
events which refer to hundreds of prev_events.
Args:
room_id (str): room_id
Returns:
Deferred[list[(str, dict[str, str], int)]]
for each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
"""
res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
if len(res) > 10:
# Sort by reverse depth, so we point to the most recent.
res.sort(key=lambda a: -a[2])
# we use half of the limit for the actual most recent events, and
# the other half to randomly point to some of the older events, to
# make sure that we don't completely ignore the older events.
res = res[0:5] + random.sample(res[5:], 5)
defer.returnValue(res)
def get_latest_event_ids_and_hashes_in_room(self, room_id):
"""
Gets the current forward extremities in the given room
Args:
room_id (str): room_id
Returns:
Deferred[list[(str, dict[str, str], int)]]
for each event, a tuple of (event_id, hashes, depth)
where *hashes* is a map from algorithm to hash.
"""
return self.runInteraction(
"get_latest_event_ids_and_hashes_in_room",
self._get_latest_event_ids_and_hashes_in_room,
@@ -225,6 +182,22 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
room_id,
)
@defer.inlineCallbacks
def get_max_depth_of_events(self, event_ids):
sql = (
"SELECT MAX(depth) FROM events WHERE event_id IN (%s)"
) % (",".join(["?"] * len(event_ids)),)
rows = yield self._execute(
"get_max_depth_of_events", None,
sql, *event_ids
)
if rows:
defer.returnValue(rows[0][0])
else:
defer.returnValue(1)
def _get_min_depth_interaction(self, txn, room_id):
min_depth = self._simple_select_one_onecol_txn(
txn,

View File

@@ -448,7 +448,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
"add_push_actions_to_staging", _add_push_actions_to_staging_txn
)
@defer.inlineCallbacks
def remove_push_actions_from_staging(self, event_id):
"""Called if we failed to persist the event to ensure that stale push
actions don't build up in the DB
@@ -457,22 +456,13 @@ class EventPushActionsWorkerStore(SQLBaseStore):
event_id (str)
"""
try:
res = yield self._simple_delete(
table="event_push_actions_staging",
keyvalues={
"event_id": event_id,
},
desc="remove_push_actions_from_staging",
)
defer.returnValue(res)
except Exception:
# this method is called from an exception handler, so propagating
# another exception here really isn't helpful - there's nothing
# the caller can do about it. Just log the exception and move on.
logger.exception(
"Error removing push actions after event persistence failure",
)
return self._simple_delete(
table="event_push_actions_staging",
keyvalues={
"event_id": event_id,
},
desc="remove_push_actions_from_staging",
)
@defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
@@ -623,6 +613,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
self._rotate_notifs, 30 * 60 * 1000
)
self._rotate_delay = 3
self._rotate_count = 10000
def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
all_events_and_contexts):
"""Handles moving push actions from staging table to main
@@ -819,7 +812,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
)
if caught_up:
break
yield sleep(5)
yield sleep(self._rotate_delay)
finally:
self._doing_notif_rotation = False
@@ -840,8 +833,8 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
txn.execute("""
SELECT stream_ordering FROM event_push_actions
WHERE stream_ordering > ?
ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
""", (old_rotate_stream_ordering,))
ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
""", (old_rotate_stream_ordering, self._rotate_count))
stream_row = txn.fetchone()
if stream_row:
offset_stream_ordering, = stream_row

View File

@@ -16,12 +16,12 @@
from collections import OrderedDict, deque, namedtuple
from functools import wraps
import itertools
import logging
import simplejson as json
from twisted.internet import defer
from synapse.storage.events_worker import EventsWorkerStore
from synapse.util.async import ObservableDeferred
from synapse.util.frozenutils import frozendict_json_encoder
@@ -424,9 +424,7 @@ class EventsStore(EventsWorkerStore):
)
current_state = yield self._get_new_state_after_events(
room_id,
ev_ctx_rm,
latest_event_ids,
new_latest_event_ids,
ev_ctx_rm, new_latest_event_ids,
)
if current_state is not None:
current_state_for_room[room_id] = current_state
@@ -514,8 +512,7 @@ class EventsStore(EventsWorkerStore):
defer.returnValue(new_latest_event_ids)
@defer.inlineCallbacks
def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids,
new_latest_event_ids):
def _get_new_state_after_events(self, room_id, events_context, new_latest_event_ids):
"""Calculate the current state dict after adding some new events to
a room
@@ -526,9 +523,6 @@ class EventsStore(EventsWorkerStore):
events_context (list[(EventBase, EventContext)]):
events and contexts which are being added to the room
old_latest_event_ids (iterable[str]):
the old forward extremities for the room.
new_latest_event_ids (iterable[str]):
the new forward extremities for the room.
@@ -539,89 +533,64 @@ class EventsStore(EventsWorkerStore):
"""
if not new_latest_event_ids:
return
defer.returnValue({})
# map from state_group to ((type, key) -> event_id) state map
state_groups_map = {}
for ev, ctx in events_context:
if ctx.state_group is None:
# I don't think this can happen, but let's double-check
raise Exception(
"Context for new extremity event %s has no state "
"group" % (ev.event_id, ),
)
if ctx.state_group in state_groups_map:
continue
state_groups_map[ctx.state_group] = ctx.current_state_ids
# We need to map the event_ids to their state groups. First, let's
# check if the event is one we're persisting, in which case we can
# pull the state group from its context.
# Otherwise we need to pull the state group from the database.
# Set of events we need to fetch groups for. (We know none of the old
# extremities are going to be in events_context).
missing_event_ids = set(old_latest_event_ids)
event_id_to_state_group = {}
state_groups = {}
missing_event_ids = []
was_updated = False
for event_id in new_latest_event_ids:
# First search in the list of new events we're adding.
# First search in the list of new events we're adding,
# and then use the current state from that
for ev, ctx in events_context:
if event_id == ev.event_id:
event_id_to_state_group[event_id] = ctx.state_group
if ctx.current_state_ids is None:
raise Exception("Unknown current state")
if ctx.state_group is None:
# I don't think this can happen, but let's double-check
raise Exception(
"Context for new extremity event %s has no state "
"group" % (event_id, ),
)
# If we've already seen the state group don't bother adding
# it to the state sets again
if ctx.state_group not in state_groups:
state_groups[ctx.state_group] = ctx.current_state_ids
if ctx.delta_ids or hasattr(ev, "state_key"):
was_updated = True
break
else:
# If we couldn't find it, then we'll need to pull
# the state from the database
missing_event_ids.add(event_id)
was_updated = True
missing_event_ids.append(event_id)
if not was_updated:
return
if missing_event_ids:
# Now pull out the state groups for any missing events from DB
# Now pull out the state for any missing events from DB
event_to_groups = yield self._get_state_group_for_events(
missing_event_ids,
)
event_id_to_state_group.update(event_to_groups)
# State groups of old_latest_event_ids
old_state_groups = set(
event_id_to_state_group[evid] for evid in old_latest_event_ids
)
groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys())
# State groups of new_latest_event_ids
new_state_groups = set(
event_id_to_state_group[evid] for evid in new_latest_event_ids
)
if groups:
group_to_state = yield self._get_state_for_groups(groups)
state_groups.update(group_to_state)
# If they old and new groups are the same then we don't need to do
# anything.
if old_state_groups == new_state_groups:
return
# Now that we have calculated new_state_groups we need to get
# their state IDs so we can resolve to a single state set.
missing_state = new_state_groups - set(state_groups_map)
if missing_state:
group_to_state = yield self._get_state_for_groups(missing_state)
state_groups_map.update(group_to_state)
if len(new_state_groups) == 1:
if len(state_groups) == 1:
# If there is only one state group, then we know what the current
# state is.
defer.returnValue(state_groups_map[new_state_groups.pop()])
# Ok, we need to defer to the state handler to resolve our state sets.
defer.returnValue(state_groups.values()[0])
def get_events(ev_ids):
return self.get_events(
ev_ids, get_prev_content=False, check_redacted=False,
)
state_groups = {
sg: state_groups_map[sg] for sg in new_state_groups
}
events_map = {ev.event_id: ev for ev, _ in events_context}
logger.debug("calling resolve_state_groups from preserve_events")
res = yield self._state_resolution_handler.resolve_state_groups(
@@ -1044,7 +1013,6 @@ class EventsStore(EventsWorkerStore):
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
"event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
@@ -1064,6 +1032,14 @@ class EventsStore(EventsWorkerStore):
[(ev.event_id,) for ev, _ in events_and_contexts]
)
for table in (
"event_push_actions",
):
txn.executemany(
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
[(ev.event_id,) for ev, _ in events_and_contexts]
)
def _store_event_txn(self, txn, events_and_contexts):
"""Insert new events into the event and event_json tables
@@ -1351,49 +1327,13 @@ class EventsStore(EventsWorkerStore):
defer.returnValue(set(r["event_id"] for r in rows))
@defer.inlineCallbacks
def have_seen_events(self, event_ids):
def have_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them.
Args:
event_ids (iterable[str]):
Returns:
Deferred[set[str]]: The events we have already seen.
"""
results = set()
def have_seen_events_txn(txn, chunk):
sql = (
"SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
% (",".join("?" * len(chunk)), )
)
txn.execute(sql, chunk)
for (event_id, ) in txn:
results.add(event_id)
# break the input up into chunks of 100
input_iterator = iter(event_ids)
for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
[]):
yield self.runInteraction(
"have_seen_events",
have_seen_events_txn,
chunk,
)
defer.returnValue(results)
def get_seen_events_with_rejections(self, event_ids):
"""Given a list of event ids, check if we rejected them.
Args:
event_ids (list[str])
Returns:
Deferred[dict[str, str|None):
Has an entry for each event id we already have seen. Maps to
the rejected reason string if we rejected the event, else maps
to None.
dict: Has an entry for each event id we already have seen. Maps to
the rejected reason string if we rejected the event, else maps to
None.
"""
if not event_ids:
return defer.succeed({})
@@ -1415,7 +1355,9 @@ class EventsStore(EventsWorkerStore):
return res
return self.runInteraction("get_rejection_reasons", f)
return self.runInteraction(
"have_events", f,
)
@defer.inlineCallbacks
def count_daily_messages(self):

View File

@@ -20,7 +20,7 @@ from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
from synapse.util.logcontext import (
PreserveLoggingContext, make_deferred_yieldable, run_in_background,
preserve_fn, PreserveLoggingContext, make_deferred_yieldable
)
from synapse.util.metrics import Measure
from synapse.api.errors import SynapseError
@@ -319,8 +319,7 @@ class EventsWorkerStore(SQLBaseStore):
res = yield make_deferred_yieldable(defer.gatherResults(
[
run_in_background(
self._get_event_from_row,
preserve_fn(self._get_event_from_row)(
row["internal_metadata"], row["json"], row["redacts"],
rejected_reason=row["rejects"],
)

View File

@@ -22,8 +22,6 @@ from synapse.storage import background_updates
from synapse.storage._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from six.moves import range
class RegistrationWorkerStore(SQLBaseStore):
@cached()
@@ -471,7 +469,7 @@ class RegistrationStore(RegistrationWorkerStore,
match = regex.search(user_id)
if match:
found.add(int(match.group(1)))
for i in range(len(found) + 1):
for i in xrange(len(found) + 1):
if i not in found:
return i

View File

@@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
# Convert the IDs to MXC URIs
for media_id in local_mxcs:
local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id))
for hostname, media_id in remote_mxcs:
remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
@@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
while next_token:
sql = """
SELECT stream_ordering, json FROM events
JOIN event_json USING (room_id, event_id)
JOIN event_json USING (event_id)
WHERE room_id = ?
AND stream_ordering < ?
AND contains_url = ? AND outlier = ?
@@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
if matches:
hostname = matches.group(1)
media_id = matches.group(2)
if hostname == self.hs.hostname:
if hostname == self.hostname:
local_media_mxcs.append(media_id)
else:
remote_media_mxcs.append((hostname, media_id))

View File

@@ -65,7 +65,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
defer.returnValue(hosts)
@cached(max_entries=100000, iterable=True)
@cachedInlineCallbacks(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id):
def f(txn):
sql = (
@@ -79,7 +79,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
txn.execute(sql, (room_id, Membership.JOIN,))
return [to_ascii(r[0]) for r in txn]
return self.runInteraction("get_users_in_room", f)
start_time = self._clock.time_msec()
result = yield self.runInteraction("get_users_in_room", f)
end_time = self._clock.time_msec()
logger.info(
"Fetched room membership for %s (%i users) in %i ms",
room_id, len(result), end_time - start_time,
)
defer.returnValue(result)
@cached()
def get_invited_rooms_for_user(self, user_id):
@@ -453,7 +460,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
defer.returnValue(joined_hosts)
@cached(max_entries=10000, iterable=True)
@cached(max_entries=10000)
def _get_joined_hosts_cache(self, room_id):
return _JoinedHostsCache(self, room_id)

View File

@@ -14,8 +14,6 @@
import logging
from synapse.config.appservice import load_appservices
from six.moves import range
logger = logging.getLogger(__name__)
@@ -60,7 +58,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
for as_id, user_ids in owned.items():
n = 100
user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n))
user_chunks = (user_ids[i:i + 100] for i in xrange(0, len(user_ids), n))
for chunk in user_chunks:
cur.execute(
database_engine.convert_param_style(

View File

@@ -1,57 +0,0 @@
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.storage.engines import PostgresEngine
from synapse.storage.prepare_database import get_statements
FIX_INDEXES = """
-- rebuild indexes as uniques
DROP INDEX groups_invites_g_idx;
CREATE UNIQUE INDEX group_invites_g_idx ON group_invites(group_id, user_id);
DROP INDEX groups_users_g_idx;
CREATE UNIQUE INDEX group_users_g_idx ON group_users(group_id, user_id);
-- rename other indexes to actually match their table names..
DROP INDEX groups_users_u_idx;
CREATE INDEX group_users_u_idx ON group_users(user_id);
DROP INDEX groups_invites_u_idx;
CREATE INDEX group_invites_u_idx ON group_invites(user_id);
DROP INDEX groups_rooms_g_idx;
CREATE UNIQUE INDEX group_rooms_g_idx ON group_rooms(group_id, room_id);
DROP INDEX groups_rooms_r_idx;
CREATE INDEX group_rooms_r_idx ON group_rooms(room_id);
"""
def run_create(cur, database_engine, *args, **kwargs):
rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid"
# remove duplicates from group_users & group_invites tables
cur.execute("""
DELETE FROM group_users WHERE %s NOT IN (
SELECT min(%s) FROM group_users GROUP BY group_id, user_id
);
""" % (rowid, rowid))
cur.execute("""
DELETE FROM group_invites WHERE %s NOT IN (
SELECT min(%s) FROM group_invites GROUP BY group_id, user_id
);
""" % (rowid, rowid))
for statement in get_statements(FIX_INDEXES.splitlines()):
cur.execute(statement)
def run_upgrade(*args, **kwargs):
pass

View File

@@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore):
sql = (
"SELECT stream_ordering, event_id, room_id, type, json, "
" origin_server_ts FROM events"
" JOIN event_json USING (room_id, event_id)"
" JOIN event_json USING (event_id)"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)"
" ORDER BY stream_ordering DESC"
@@ -721,7 +721,7 @@ def _parse_query(database_engine, search_term):
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
if isinstance(database_engine, PostgresEngine):
return " & ".join(result + ":*" for result in results)
return " & ".join(result for result in results)
elif isinstance(database_engine, Sqlite3Engine):
return " & ".join(result + "*" for result in results)
else:

View File

@@ -20,7 +20,7 @@ from twisted.internet import defer
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine
from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR
from synapse.util.caches import intern_string, get_cache_factor_for
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.stringutils import to_ascii
@@ -54,7 +54,7 @@ class StateGroupWorkerStore(SQLBaseStore):
super(StateGroupWorkerStore, self).__init__(db_conn, hs)
self._state_group_cache = DictionaryCache(
"*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
"*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache")
)
@cached(max_entries=100000, iterable=True)
@@ -566,8 +566,7 @@ class StateGroupWorkerStore(SQLBaseStore):
state_dict = results[group]
state_dict.update(
((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
for k, v in group_state_dict.iteritems()
group_state_dict.iteritems()
)
self._state_group_cache.update(

View File

@@ -41,14 +41,12 @@ from synapse.storage.events import EventsWorkerStore
from synapse.util.caches.descriptors import cached
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
import abc
import logging
from six.moves import range
logger = logging.getLogger(__name__)
@@ -198,14 +196,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
results = {}
room_ids = list(room_ids)
for rm_ids in (room_ids[i:i + 20] for i in range(0, len(room_ids), 20)):
for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
res = yield make_deferred_yieldable(defer.gatherResults([
run_in_background(
self.get_room_events_stream_for_room,
preserve_fn(self.get_room_events_stream_for_room)(
room_id, from_key, to_key, limit, order=order,
)
for room_id in rm_ids
], consumeErrors=True))
]))
results.update(dict(zip(rm_ids, res)))
defer.returnValue(results)

View File

@@ -22,8 +22,6 @@ from twisted.internet import defer
import simplejson as json
import logging
from six.moves import range
logger = logging.getLogger(__name__)
@@ -100,7 +98,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
batch_size = 50
results = []
for i in range(0, len(tag_ids), batch_size):
for i in xrange(0, len(tag_ids), batch_size):
tags = yield self.runInteraction(
"get_all_updated_tag_content",
get_tag_content,

View File

@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from synapse.api.errors import SynapseError
from synapse.util.logcontext import PreserveLoggingContext
from twisted.internet import defer, reactor, task
@@ -23,6 +24,11 @@ import logging
logger = logging.getLogger(__name__)
class DeferredTimedOutError(SynapseError):
def __init__(self):
super(DeferredTimedOutError, self).__init__(504, "Timed out")
def unwrapFirstError(failure):
# defer.gatherResults and DeferredLists wrap failures.
failure.trap(defer.FirstError)
@@ -79,3 +85,53 @@ class Clock(object):
except Exception:
if not ignore_errs:
raise
def time_bound_deferred(self, given_deferred, time_out):
if given_deferred.called:
return given_deferred
ret_deferred = defer.Deferred()
def timed_out_fn():
e = DeferredTimedOutError()
try:
ret_deferred.errback(e)
except Exception:
pass
try:
given_deferred.cancel()
except Exception:
pass
timer = None
def cancel(res):
try:
self.cancel_call_later(timer)
except Exception:
pass
return res
ret_deferred.addBoth(cancel)
def success(res):
try:
ret_deferred.callback(res)
except Exception:
pass
return res
def err(res):
try:
ret_deferred.errback(res)
except Exception:
pass
given_deferred.addCallbacks(callback=success, errback=err)
timer = self.call_later(time_out, timed_out_fn)
return ret_deferred

View File

@@ -15,11 +15,9 @@
from twisted.internet import defer, reactor
from twisted.internet.defer import CancelledError
from twisted.python import failure
from .logcontext import (
PreserveLoggingContext, make_deferred_yieldable, run_in_background
PreserveLoggingContext, make_deferred_yieldable, preserve_fn
)
from synapse.util import logcontext, unwrapFirstError
@@ -27,8 +25,6 @@ from contextlib import contextmanager
import logging
from six.moves import range
logger = logging.getLogger(__name__)
@@ -160,13 +156,13 @@ def concurrently_execute(func, args, limit):
def _concurrently_execute_inner():
try:
while True:
yield func(next(it))
yield func(it.next())
except StopIteration:
pass
return logcontext.make_deferred_yieldable(defer.gatherResults([
run_in_background(_concurrently_execute_inner)
for _ in range(limit)
preserve_fn(_concurrently_execute_inner)()
for _ in xrange(limit)
], consumeErrors=True)).addErrback(unwrapFirstError)
@@ -396,68 +392,3 @@ class ReadWriteLock(object):
self.key_to_current_writer.pop(key)
defer.returnValue(_ctx_manager())
class DeferredTimeoutError(Exception):
"""
This error is raised by default when a L{Deferred} times out.
"""
def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
"""
Add a timeout to a deferred by scheduling it to be cancelled after
timeout seconds.
This is essentially a backport of deferred.addTimeout, which was introduced
in twisted 16.5.
If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
unless a cancelable function was passed to its initialization or unless
a different on_timeout_cancel callable is provided.
Args:
deferred (defer.Deferred): deferred to be timed out
timeout (Number): seconds to time out after
on_timeout_cancel (callable): A callable which is called immediately
after the deferred times out, and not if this deferred is
otherwise cancelled before the timeout.
It takes an arbitrary value, which is the value of the deferred at
that exact point in time (probably a CancelledError Failure), and
the timeout.
The default callable (if none is provided) will translate a
CancelledError Failure into a DeferredTimeoutError.
"""
timed_out = [False]
def time_it_out():
timed_out[0] = True
deferred.cancel()
delayed_call = reactor.callLater(timeout, time_it_out)
def convert_cancelled(value):
if timed_out[0]:
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
return to_call(value, timeout)
return value
deferred.addBoth(convert_cancelled)
def cancel_timeout(result):
# stop the pending call to cancel the deferred if it's been fired
if delayed_call.active():
delayed_call.cancel()
return result
deferred.addBoth(cancel_timeout)
def _cancelled_to_timed_out_error(value, timeout):
if isinstance(value, failure.Failure):
value.trap(CancelledError)
raise DeferredTimeoutError(timeout, "Deferred")
return value

View File

@@ -18,6 +18,16 @@ import os
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
def get_cache_factor_for(cache_name):
env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper()
factor = os.environ.get(env_var)
if factor:
return float(factor)
return CACHE_SIZE_FACTOR
metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
caches_by_name = {}

View File

@@ -17,7 +17,7 @@ import logging
from synapse.util.async import ObservableDeferred
from synapse.util import unwrapFirstError, logcontext
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
from synapse.util.stringutils import to_ascii
@@ -310,7 +310,7 @@ class CacheDescriptor(_CacheDescriptorBase):
orig, num_args=num_args, inlineCallbacks=inlineCallbacks,
cache_context=cache_context)
max_entries = int(max_entries * CACHE_SIZE_FACTOR)
max_entries = int(max_entries * get_cache_factor_for(orig.__name__))
self.max_entries = max_entries
self.tree = tree

View File

@@ -12,15 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.util.async import ObservableDeferred
from synapse.util.caches import metrics as cache_metrics
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
class ResponseCache(object):
@@ -37,7 +31,6 @@ class ResponseCache(object):
self.clock = hs.get_clock()
self.timeout_sec = timeout_ms / 1000.
self._name = name
self._metrics = cache_metrics.register_cache(
"response_cache",
size_callback=lambda: self.size(),
@@ -50,21 +43,15 @@ class ResponseCache(object):
def get(self, key):
"""Look up the given key.
Can return either a new Deferred (which also doesn't follow the synapse
logcontext rules), or, if the request has completed, the actual
result. You will probably want to make_deferred_yieldable the result.
If there is no entry for the key, returns None. It is worth noting that
this means there is no way to distinguish a completed result of None
from an absent cache entry.
Returns a deferred which doesn't follow the synapse logcontext rules,
so you'll probably want to make_deferred_yieldable it.
Args:
key (hashable):
key (str):
Returns:
twisted.internet.defer.Deferred|None|E: None if there is no entry
for this key; otherwise either a deferred result or the result
itself.
twisted.internet.defer.Deferred|None: None if there is no entry
for this key; otherwise a deferred result.
"""
result = self.pending_result_cache.get(key)
if result is not None:
@@ -81,17 +68,19 @@ class ResponseCache(object):
you should wrap normal synapse deferreds with
logcontext.run_in_background).
Can return either a new Deferred (which also doesn't follow the synapse
logcontext rules), or, if *deferred* was already complete, the actual
result. You will probably want to make_deferred_yieldable the result.
Returns a new Deferred which also doesn't follow the synapse logcontext
rules, so you will want to make_deferred_yieldable it
(TODO: before using this more widely, it might make sense to refactor
it and get() so that they do the necessary wrapping rather than having
to do it everywhere ResponseCache is used.)
Args:
key (hashable):
deferred (twisted.internet.defer.Deferred[T):
key (str):
deferred (twisted.internet.defer.Deferred):
Returns:
twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual
result.
twisted.internet.defer.Deferred
"""
result = ObservableDeferred(deferred, consumeErrors=True)
self.pending_result_cache[key] = result
@@ -108,52 +97,3 @@ class ResponseCache(object):
result.addBoth(remove)
return result.observe()
def wrap(self, key, callback, *args, **kwargs):
"""Wrap together a *get* and *set* call, taking care of logcontexts
First looks up the key in the cache, and if it is present makes it
follow the synapse logcontext rules and returns it.
Otherwise, makes a call to *callback(*args, **kwargs)*, which should
follow the synapse logcontext rules, and adds the result to the cache.
Example usage:
@defer.inlineCallbacks
def handle_request(request):
# etc
defer.returnValue(result)
result = yield response_cache.wrap(
key,
handle_request,
request,
)
Args:
key (hashable): key to get/set in the cache
callback (callable): function to call if the key is not found in
the cache
*args: positional parameters to pass to the callback, if it is used
**kwargs: named paramters to pass to the callback, if it is used
Returns:
twisted.internet.defer.Deferred: yieldable result
"""
result = self.get(key)
if not result:
logger.info("[%s]: no cached result for [%s], calculating new one",
self._name, key)
d = run_in_background(callback, *args, **kwargs)
result = self.set(key, d)
elif not isinstance(result, defer.Deferred) or result.called:
logger.info("[%s]: using completed cached result for [%s]",
self._name, key)
else:
logger.info("[%s]: using incomplete cached result for [%s]",
self._name, key)
return make_deferred_yieldable(result)

View File

@@ -15,9 +15,9 @@
from twisted.internet import threads, reactor
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from six.moves import queue
import Queue
class BackgroundFileConsumer(object):
@@ -49,7 +49,7 @@ class BackgroundFileConsumer(object):
# Queue of slices of bytes to be written. When producer calls
# unregister a final None is sent.
self._bytes_queue = queue.Queue()
self._bytes_queue = Queue.Queue()
# Deferred that is resolved when finished writing
self._finished_deferred = None
@@ -70,9 +70,7 @@ class BackgroundFileConsumer(object):
self._producer = producer
self.streaming = streaming
self._finished_deferred = run_in_background(
threads.deferToThread, self._writer
)
self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer)
if not streaming:
self._producer.resumeProducing()

View File

@@ -40,12 +40,9 @@ def create_resource_tree(desired_tree, root_resource):
# extra resources to existing nodes. See self._resource_id for the key.
resource_mappings = {}
for full_path, res in desired_tree.items():
# twisted requires all resources to be bytes
full_path = full_path.encode("utf-8")
logger.info("Attaching %s to path %s", res, full_path)
last_resource = root_resource
for path_seg in full_path.split(b'/')[1:-1]:
for path_seg in full_path.split('/')[1:-1]:
if path_seg not in last_resource.listNames():
# resource doesn't exist, so make a "dummy resource"
child_resource = NoResource()
@@ -60,7 +57,7 @@ def create_resource_tree(desired_tree, root_resource):
# ===========================
# now attach the actual desired resource
last_path_seg = full_path.split(b'/')[-1]
last_path_seg = full_path.split('/')[-1]
# if there is already a resource here, thieve its children and
# replace it

View File

@@ -92,7 +92,6 @@ class LoggingContext(object):
def __nonzero__(self):
return False
__bool__ = __nonzero__ # python3
sentinel = Sentinel()
@@ -164,7 +163,7 @@ class LoggingContext(object):
current = self.set_current_context(self.previous_context)
if current is not self:
if current is self.sentinel:
logger.warn("Expected logging context %s has been lost", self)
logger.debug("Expected logging context %s has been lost", self)
else:
logger.warn(
"Current logging context %s is not expected context %s",
@@ -279,7 +278,7 @@ class PreserveLoggingContext(object):
context = LoggingContext.set_current_context(self.current_context)
if context != self.new_context:
logger.warn(
logger.debug(
"Unexpected logging context: %s is not %s",
context, self.new_context,
)
@@ -302,49 +301,31 @@ def preserve_fn(f):
def run_in_background(f, *args, **kwargs):
"""Calls a function, ensuring that the current context is restored after
return from the function, and that the sentinel context is set once the
deferred returned by the function completes.
deferred returned by the funtion completes.
Useful for wrapping functions that return a deferred which you don't yield
on (for instance because you want to pass it to deferred.gatherResults()).
Note that if you completely discard the result, you should make sure that
`f` doesn't raise any deferred exceptions, otherwise a scary-looking
CRITICAL error about an unhandled error will be logged without much
indication about where it came from.
on.
"""
current = LoggingContext.current_context()
try:
res = f(*args, **kwargs)
except: # noqa: E722
# the assumption here is that the caller doesn't want to be disturbed
# by synchronous exceptions, so let's turn them into Failures.
return defer.fail()
res = f(*args, **kwargs)
if isinstance(res, defer.Deferred) and not res.called:
# The function will have reset the context before returning, so
# we need to restore it now.
LoggingContext.set_current_context(current)
if not isinstance(res, defer.Deferred):
return res
if res.called and not res.paused:
# The function should have maintained the logcontext, so we can
# optimise out the messing about
return res
# The function may have reset the context before returning, so
# we need to restore it now.
ctx = LoggingContext.set_current_context(current)
# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
# probably currently within a ``with LoggingContext()`` block,
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
res.addBoth(_set_context_cb, ctx)
# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
# probably currently within a ``with LoggingContext()`` block,
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
res.addBoth(_set_context_cb, LoggingContext.sentinel)
return res
@@ -359,20 +340,11 @@ def make_deferred_yieldable(deferred):
returning a deferred. Then, when the deferred completes, restores the
current logcontext before running callbacks/errbacks.
(This is more-or-less the opposite operation to run_in_background.)
(This is more-or-less the opposite operation to preserve_fn.)
"""
if not isinstance(deferred, defer.Deferred):
return deferred
if deferred.called and not deferred.paused:
# it looks like this deferred is ready to run any callbacks we give it
# immediately. We may as well optimise out the logcontext faffery.
return deferred
# ok, we can't be sure that a yield won't block, so let's reset the
# logcontext, and add a callback to the deferred to restore it.
prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
deferred.addBoth(_set_context_cb, prev_context)
if isinstance(deferred, defer.Deferred) and not deferred.called:
prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
deferred.addBoth(_set_context_cb, prev_context)
return deferred

View File

@@ -14,7 +14,7 @@
# limitations under the License.
from six import StringIO
import StringIO
import logging
import traceback
@@ -32,7 +32,7 @@ class LogFormatter(logging.Formatter):
super(LogFormatter, self).__init__(*args, **kwargs)
def formatException(self, ei):
sio = StringIO()
sio = StringIO.StringIO()
(typ, val, tb) = ei
# log the stack above the exception capture point if possible, but

Some files were not shown because too many files have changed in this diff Show More