1
0

Compare commits

...

57 Commits

Author SHA1 Message Date
Erik Johnston
67362a9a03 Merge branch 'release-v0.9.3' of github.com:matrix-org/synapse 2015-07-01 15:12:57 +01:00
Erik Johnston
480d720388 Bump changelog and version to v0.9.3 2015-07-01 15:12:32 +01:00
Matthew Hodgson
11374a77ef clarify readme a bit more 2015-06-27 16:59:47 +02:00
Erik Johnston
f043b14bc0 Update change log 2015-06-23 10:22:42 +01:00
Erik Johnston
9c72011fd7 Bumb version 2015-06-23 10:12:19 +01:00
Erik Johnston
2f556e0c55 Fix typo 2015-06-19 16:22:53 +01:00
Erik Johnston
7fa1363fb0 Merge pull request #192 from matrix-org/erikj/fix_log_context
Fix log context when sending requests
2015-06-19 16:21:40 +01:00
Erik Johnston
275dab6b55 Merge pull request #190 from matrix-org/erikj/syn-412
Fix notifier leak
2015-06-19 11:49:58 +01:00
Erik Johnston
a68abc79fd Add comment on cancellation of observers 2015-06-19 11:48:55 +01:00
Erik Johnston
653533a3da Fix log context when sending requests 2015-06-19 11:46:49 +01:00
Erik Johnston
9bf61ef97b Merge pull request #189 from matrix-org/erikj/room_init_sync
Improve room init sync speed.
2015-06-19 11:36:06 +01:00
Erik Johnston
0e58d19163 Merge pull request #187 from matrix-org/erikj/sanitize_logging
Sanitize logging
2015-06-19 11:35:59 +01:00
Erik Johnston
18968efa0a Remove stale debug lines 2015-06-19 10:18:02 +01:00
Erik Johnston
eb928c9f52 Add site_tag to logger 2015-06-19 10:16:48 +01:00
Erik Johnston
9d112f4440 Add IDs to outbound transactions 2015-06-19 10:13:03 +01:00
Erik Johnston
ad460a8315 Add Eric Myhre to AUTHORS 2015-06-19 09:23:26 +01:00
Erik Johnston
bf628cf6dd Merge pull request #191 from heavenlyhash/configurable-upload-dir
Make upload dir a configurable path.
2015-06-19 09:16:04 +01:00
Eric Myhre
9e5a353663 Make upload dir a configurable path.
Fixes SYN-425.

Signed-off-by: Eric Myhre <hash@exultant.us>
2015-06-18 23:38:20 -05:00
Erik Johnston
6f6ebd216d PEP8 2015-06-18 17:00:32 +01:00
Erik Johnston
73513ececc Documentation 2015-06-18 16:15:10 +01:00
Erik Johnston
1f24c2e589 Don't bother proxying lookups on _NotificationListener to underlying deferred 2015-06-18 16:09:53 +01:00
Erik Johnston
22049ea700 Refactor the notifier.wait_for_events code to be clearer. Add _NotifierUserStream.new_listener that accpets a token to avoid races. 2015-06-18 15:49:24 +01:00
Erik Johnston
050ebccf30 Fix notifier leak 2015-06-18 11:36:26 +01:00
Kegan Dougal
d88e20cdb9 Fix bug where synapse was sending AS user queries incorrectly.
Bug introduced in 92b20713d7
which reversed the comparison when checking if a user existed
in the users table. Added UTs to prevent this happening again.
2015-06-17 17:26:03 +01:00
Erik Johnston
eceb554a2f Use another deferred list 2015-06-16 17:12:27 +01:00
Erik Johnston
b849a64f8d Use DeferredList 2015-06-16 17:03:24 +01:00
Erik Johnston
0460406298 Don't do unecessary db ops in presence.get_state 2015-06-16 16:59:38 +01:00
Paul "LeoNerd" Evans
9a3cd1c00d Correct -H SERVER_NAME in config-missing complaint message 2015-06-16 16:03:35 +01:00
Erik Johnston
fb7def3344 Remove access_token from synapse.rest.client.v1.transactions {get,store}_response logging 2015-06-16 10:09:43 +01:00
Erik Johnston
f13890ddce Merge branch 'develop' of github.com:matrix-org/synapse into erikj/sanitize_logging 2015-06-15 18:26:24 +01:00
Erik Johnston
aaa749d366 Disable twisted access logging. Move access logging to SynapseRequest object 2015-06-15 18:18:05 +01:00
Erik Johnston
bc42ca121f Merge pull request #185 from matrix-org/erikj/listeners_config
Change listener config.
2015-06-15 18:05:58 +01:00
Erik Johnston
cee69441d3 Log more when we have processed the request 2015-06-15 17:11:44 +01:00
Erik Johnston
b5209c5744 Create SynapseRequest that overrides __repr__ to not print access_token 2015-06-15 16:37:04 +01:00
Erik Johnston
f0583f65e1 Merge branch 'master' of github.com:matrix-org/synapse into develop 2015-06-15 15:17:47 +01:00
Erik Johnston
44c9102e7a Merge branch 'erikj/listeners_config' into erikj/sanitize_logging 2015-06-15 14:45:52 +01:00
Erik Johnston
6a7cf6b41f Merge pull request #186 from matrix-org/hotfixes-v0.9.2-r2
Hotfixes v0.9.2-r2
2015-06-15 14:39:46 +01:00
Erik Johnston
2acee97c2b Changelog 2015-06-15 14:20:25 +01:00
Erik Johnston
7f7ec84d6f Bump version 2015-06-15 14:16:29 +01:00
Erik Johnston
cebde85b94 Merge branch 'master' of github.com:matrix-org/synapse into hotfixes-v0.9.2-r2 2015-06-15 14:16:12 +01:00
Erik Johnston
f00f8346f1 Make http.server request logging more verbose, but redact access_tokens 2015-06-15 13:37:58 +01:00
Erik Johnston
83f119a84a Log requests and responses sent via http.client 2015-06-15 13:14:12 +01:00
Erik Johnston
9d0326baa6 Remove redundant newline 2015-06-15 11:27:29 +01:00
Erik Johnston
186f61a3ac Document listener config. Remove deprecated config options 2015-06-15 11:25:53 +01:00
Erik Johnston
fe9bac3749 Merge pull request #184 from matrix-org/hotfixes-v0.9.2-r1
Hotfixes v0.9.2-r1
2015-06-13 12:26:42 +01:00
Erik Johnston
6c01ceb8d0 Bump version 2015-06-13 12:22:14 +01:00
Erik Johnston
2eda996a63 Add a dummy.sql into delta/20 as pip isn't packinging the pushers.py 2015-06-13 12:21:58 +01:00
Matthew Hodgson
4706f3964d Merge pull request #183 from intelfx/install-python-schema-deltas
MANIFEST.in: include python schema delta scripts
2015-06-13 12:09:17 +01:00
Ivan Shapovalov
4df76b0a5d MANIFEST.in: include python schema delta scripts (we now have one in 20/) 2015-06-13 11:08:49 +03:00
Erik Johnston
a005b7269a Add backwards compat support for metrics, manhole and webclient config options 2015-06-12 17:44:23 +01:00
Erik Johnston
261ccd7f5f Fix tests 2015-06-12 17:17:29 +01:00
Erik Johnston
942e39e87c PEP8 2015-06-12 17:13:54 +01:00
Erik Johnston
9c5fc81c2d Correctly handle x_forwaded listener option 2015-06-12 17:13:23 +01:00
Erik Johnston
fd2c07bfed Use config.listeners 2015-06-12 15:33:07 +01:00
Erik Johnston
c42ed47660 Fix up create_resource_tree 2015-06-12 11:52:52 +01:00
Erik Johnston
295b400d57 Merge branch 'release-v0.9.2' into develop 2015-06-11 16:08:48 +01:00
Erik Johnston
f7f07dc517 Begin changing the config format 2015-06-11 15:48:52 +01:00
28 changed files with 801 additions and 485 deletions

View File

@@ -38,3 +38,7 @@ Brabo <brabo at riseup.net>
Ivan Shapovalov <intelfx100 at gmail.com>
* contrib/systemd: a sample systemd unit file and a logger configuration
Eric Myhre <hash at exultant.us>
* Fix bug where ``media_store_path`` config option was ignored by v0 content
repository API.

View File

@@ -1,3 +1,31 @@
Changes in synapse v0.9.3 (2015-07-01)
======================================
No changes from v0.9.3 Release Candidate 1.
Changes in synapse v0.9.3-rc1 (2015-06-23)
==========================================
General:
* Fix a memory leak in the notifier. (SYN-412)
* Improve performance of room initial sync. (SYN-418)
* General improvements to logging.
* Remove ``access_token`` query params from ``INFO`` level logging.
Configuration:
* Add support for specifying and configuring multiple listeners. (SYN-389)
Application services:
* Fix bug where synapse failed to send user queries to application services.
Changes in synapse v0.9.2-r2 (2015-06-15)
=========================================
Fix packaging so that schema delta python files get included in the package.
Changes in synapse v0.9.2 (2015-06-12)
======================================

View File

@@ -5,6 +5,7 @@ include *.rst
include demo/README
recursive-include synapse/storage/schema *.sql
recursive-include synapse/storage/schema *.py
recursive-include demo *.dh
recursive-include demo *.py

View File

@@ -122,7 +122,10 @@ To install the synapse homeserver run::
$ pip install --process-dependency-links https://github.com/matrix-org/synapse/tarball/master
This installs synapse, along with the libraries it uses, into a virtual
environment under ``~/.synapse``.
environment under ``~/.synapse``. Feel free to pick a different directory
if you prefer.
In case of problems, please see the _Troubleshooting section below.
Alternatively, Silvio Fricke has contributed a Dockerfile to automate the
above in Docker at https://registry.hub.docker.com/u/silviof/docker-matrix/.

View File

@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.9.2"
__version__ = "0.9.3"

View File

@@ -370,6 +370,8 @@ class Auth(object):
user_agent=user_agent
)
request.authenticated_entity = user.to_string()
defer.returnValue((user, ClientInfo(device_id, token_id)))
except KeyError:
raise AuthError(

View File

@@ -34,8 +34,7 @@ from twisted.application import service
from twisted.enterprise import adbapi
from twisted.web.resource import Resource, EncodingResourceWrapper
from twisted.web.static import File
from twisted.web.server import Site, GzipEncoderFactory
from twisted.web.http import proxiedLogFormatter, combinedLogFormatter
from twisted.web.server import Site, GzipEncoderFactory, Request
from synapse.http.server import JsonResource, RootRedirect
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.media.v1.media_repository import MediaRepositoryResource
@@ -61,11 +60,13 @@ import twisted.manhole.telnet
import synapse
import contextlib
import logging
import os
import re
import resource
import subprocess
import time
logger = logging.getLogger("synapse.app.homeserver")
@@ -87,16 +88,10 @@ class SynapseHomeServer(HomeServer):
return MatrixFederationHttpClient(self)
def build_resource_for_client(self):
res = ClientV1RestResource(self)
if self.config.gzip_responses:
res = gz_wrap(res)
return res
return ClientV1RestResource(self)
def build_resource_for_client_v2_alpha(self):
res = ClientV2AlphaRestResource(self)
if self.config.gzip_responses:
res = gz_wrap(res)
return res
return ClientV2AlphaRestResource(self)
def build_resource_for_federation(self):
return JsonResource(self)
@@ -119,7 +114,7 @@ class SynapseHomeServer(HomeServer):
def build_resource_for_content_repo(self):
return ContentRepoResource(
self, self.upload_dir, self.auth, self.content_addr
self, self.config.uploads_path, self.auth, self.content_addr
)
def build_resource_for_media_repository(self):
@@ -145,152 +140,105 @@ class SynapseHomeServer(HomeServer):
**self.db_config.get("args", {})
)
def create_resource_tree(self, redirect_root_to_web_client):
"""Create the resource tree for this Home Server.
def _listener_http(self, config, listener_config):
port = listener_config["port"]
bind_address = listener_config.get("bind_address", "")
tls = listener_config.get("tls", False)
site_tag = listener_config.get("tag", port)
This in unduly complicated because Twisted does not support putting
child resources more than 1 level deep at a time.
Args:
web_client (bool): True to enable the web client.
redirect_root_to_web_client (bool): True to redirect '/' to the
location of the web client. This does nothing if web_client is not
True.
"""
config = self.get_config()
web_client = config.web_client
# list containing (path_str, Resource) e.g:
# [ ("/aaa/bbb/cc", Resource1), ("/aaa/dummy", Resource2) ]
desired_tree = [
(CLIENT_PREFIX, self.get_resource_for_client()),
(CLIENT_V2_ALPHA_PREFIX, self.get_resource_for_client_v2_alpha()),
(FEDERATION_PREFIX, self.get_resource_for_federation()),
(CONTENT_REPO_PREFIX, self.get_resource_for_content_repo()),
(SERVER_KEY_PREFIX, self.get_resource_for_server_key()),
(SERVER_KEY_V2_PREFIX, self.get_resource_for_server_key_v2()),
(MEDIA_PREFIX, self.get_resource_for_media_repository()),
(STATIC_PREFIX, self.get_resource_for_static_content()),
]
if web_client:
logger.info("Adding the web client.")
desired_tree.append((WEB_CLIENT_PREFIX,
self.get_resource_for_web_client()))
if web_client and redirect_root_to_web_client:
self.root_resource = RootRedirect(WEB_CLIENT_PREFIX)
else:
self.root_resource = Resource()
if tls and config.no_tls:
return
metrics_resource = self.get_resource_for_metrics()
if config.metrics_port is None and metrics_resource is not None:
desired_tree.append((METRICS_PREFIX, metrics_resource))
# ideally we'd just use getChild and putChild but getChild doesn't work
# unless you give it a Request object IN ADDITION to the name :/ So
# instead, we'll store a copy of this mapping so we can actually add
# extra resources to existing nodes. See self._resource_id for the key.
resource_mappings = {}
for full_path, res in desired_tree:
logger.info("Attaching %s to path %s", res, full_path)
last_resource = self.root_resource
for path_seg in full_path.split('/')[1:-1]:
if path_seg not in last_resource.listNames():
# resource doesn't exist, so make a "dummy resource"
child_resource = Resource()
last_resource.putChild(path_seg, child_resource)
res_id = self._resource_id(last_resource, path_seg)
resource_mappings[res_id] = child_resource
last_resource = child_resource
else:
# we have an existing Resource, use that instead.
res_id = self._resource_id(last_resource, path_seg)
last_resource = resource_mappings[res_id]
resources = {}
for res in listener_config["resources"]:
for name in res["names"]:
if name == "client":
if res["compress"]:
client_v1 = gz_wrap(self.get_resource_for_client())
client_v2 = gz_wrap(self.get_resource_for_client_v2_alpha())
else:
client_v1 = self.get_resource_for_client()
client_v2 = self.get_resource_for_client_v2_alpha()
# ===========================
# now attach the actual desired resource
last_path_seg = full_path.split('/')[-1]
resources.update({
CLIENT_PREFIX: client_v1,
CLIENT_V2_ALPHA_PREFIX: client_v2,
})
# if there is already a resource here, thieve its children and
# replace it
res_id = self._resource_id(last_resource, last_path_seg)
if res_id in resource_mappings:
# there is a dummy resource at this path already, which needs
# to be replaced with the desired resource.
existing_dummy_resource = resource_mappings[res_id]
for child_name in existing_dummy_resource.listNames():
child_res_id = self._resource_id(existing_dummy_resource,
child_name)
child_resource = resource_mappings[child_res_id]
# steal the children
res.putChild(child_name, child_resource)
if name == "federation":
resources.update({
FEDERATION_PREFIX: self.get_resource_for_federation(),
})
# finally, insert the desired resource in the right place
last_resource.putChild(last_path_seg, res)
res_id = self._resource_id(last_resource, last_path_seg)
resource_mappings[res_id] = res
if name in ["static", "client"]:
resources.update({
STATIC_PREFIX: self.get_resource_for_static_content(),
})
return self.root_resource
if name in ["media", "federation", "client"]:
resources.update({
MEDIA_PREFIX: self.get_resource_for_media_repository(),
CONTENT_REPO_PREFIX: self.get_resource_for_content_repo(),
})
def _resource_id(self, resource, path_seg):
"""Construct an arbitrary resource ID so you can retrieve the mapping
later.
if name in ["keys", "federation"]:
resources.update({
SERVER_KEY_PREFIX: self.get_resource_for_server_key(),
SERVER_KEY_V2_PREFIX: self.get_resource_for_server_key_v2(),
})
If you want to represent resource A putChild resource B with path C,
the mapping should looks like _resource_id(A,C) = B.
if name == "webclient":
resources[WEB_CLIENT_PREFIX] = self.get_resource_for_web_client()
Args:
resource (Resource): The *parent* Resource
path_seg (str): The name of the child Resource to be attached.
Returns:
str: A unique string which can be a key to the child Resource.
"""
return "%s-%s" % (resource, path_seg)
if name == "metrics" and metrics_resource:
resources[METRICS_PREFIX] = metrics_resource
root_resource = create_resource_tree(resources)
if tls:
reactor.listenSSL(
port,
SynapseSite(
"synapse.access.https.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
),
self.tls_context_factory,
interface=bind_address
)
else:
reactor.listenTCP(
port,
SynapseSite(
"synapse.access.http.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
),
interface=bind_address
)
logger.info("Synapse now listening on port %d", port)
def start_listening(self):
config = self.get_config()
if not config.no_tls and config.bind_port is not None:
reactor.listenSSL(
config.bind_port,
SynapseSite(
"synapse.access.https",
config,
self.root_resource,
),
self.tls_context_factory,
interface=config.bind_host
)
logger.info("Synapse now listening on port %d", config.bind_port)
if config.unsecure_port is not None:
reactor.listenTCP(
config.unsecure_port,
SynapseSite(
"synapse.access.http",
config,
self.root_resource,
),
interface=config.bind_host
)
logger.info("Synapse now listening on port %d", config.unsecure_port)
metrics_resource = self.get_resource_for_metrics()
if metrics_resource and config.metrics_port is not None:
reactor.listenTCP(
config.metrics_port,
SynapseSite(
"synapse.access.metrics",
config,
metrics_resource,
),
interface=config.metrics_bind_host,
)
logger.info(
"Metrics now running on %s port %d",
config.metrics_bind_host, config.metrics_port,
)
for listener in config.listeners:
if listener["type"] == "http":
self._listener_http(config, listener)
elif listener["type"] == "manhole":
f = twisted.manhole.telnet.ShellFactory()
f.username = "matrix"
f.password = "rabbithole"
f.namespace['hs'] = self
reactor.listenTCP(
listener["port"],
f,
interface=listener.get("bind_address", '127.0.0.1')
)
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
def run_startup_checks(self, db_conn, database_engine):
all_users_native = are_all_users_on_domain(
@@ -425,11 +373,6 @@ def setup(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
if re.search(":[0-9]+$", config.server_name):
domain_with_port = config.server_name
else:
domain_with_port = "%s:%s" % (config.server_name, config.bind_port)
tls_context_factory = context_factory.ServerContextFactory(config)
database_engine = create_engine(config.database_config["name"])
@@ -437,8 +380,6 @@ def setup(config_options):
hs = SynapseHomeServer(
config.server_name,
domain_with_port=domain_with_port,
upload_dir=os.path.abspath("uploads"),
db_config=config.database_config,
tls_context_factory=tls_context_factory,
config=config,
@@ -447,10 +388,6 @@ def setup(config_options):
database_engine=database_engine,
)
hs.create_resource_tree(
redirect_root_to_web_client=True,
)
logger.info("Preparing database: %r...", config.database_config)
try:
@@ -475,13 +412,6 @@ def setup(config_options):
logger.info("Database prepared in %r.", config.database_config)
if config.manhole:
f = twisted.manhole.telnet.ShellFactory()
f.username = "matrix"
f.password = "rabbithole"
f.namespace['hs'] = hs
reactor.listenTCP(config.manhole, f, interface='127.0.0.1')
hs.start_listening()
hs.get_pusherpool().start()
@@ -507,22 +437,194 @@ class SynapseService(service.Service):
return self._port.stopListening()
class SynapseRequest(Request):
def __init__(self, site, *args, **kw):
Request.__init__(self, *args, **kw)
self.site = site
self.authenticated_entity = None
self.start_time = 0
def __repr__(self):
# We overwrite this so that we don't log ``access_token``
return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % (
self.__class__.__name__,
id(self),
self.method,
self.get_redacted_uri(),
self.clientproto,
self.site.site_tag,
)
def get_redacted_uri(self):
return re.sub(
r'(\?.*access_token=)[^&]*(.*)$',
r'\1<redacted>\2',
self.uri
)
def get_user_agent(self):
return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1]
def started_processing(self):
self.site.access_logger.info(
"%s - %s - Received request: %s %s",
self.getClientIP(),
self.site.site_tag,
self.method,
self.get_redacted_uri()
)
self.start_time = int(time.time() * 1000)
def finished_processing(self):
self.site.access_logger.info(
"%s - %s - {%s}"
" Processed request: %dms %sB %s \"%s %s %s\" \"%s\"",
self.getClientIP(),
self.site.site_tag,
self.authenticated_entity,
int(time.time() * 1000) - self.start_time,
self.sentLength,
self.code,
self.method,
self.get_redacted_uri(),
self.clientproto,
self.get_user_agent(),
)
@contextlib.contextmanager
def processing(self):
self.started_processing()
yield
self.finished_processing()
class XForwardedForRequest(SynapseRequest):
def __init__(self, *args, **kw):
SynapseRequest.__init__(self, *args, **kw)
"""
Add a layer on top of another request that only uses the value of an
X-Forwarded-For header as the result of C{getClientIP}.
"""
def getClientIP(self):
"""
@return: The client address (the first address) in the value of the
I{X-Forwarded-For header}. If the header is not present, return
C{b"-"}.
"""
return self.requestHeaders.getRawHeaders(
b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip()
class SynapseRequestFactory(object):
def __init__(self, site, x_forwarded_for):
self.site = site
self.x_forwarded_for = x_forwarded_for
def __call__(self, *args, **kwargs):
if self.x_forwarded_for:
return XForwardedForRequest(self.site, *args, **kwargs)
else:
return SynapseRequest(self.site, *args, **kwargs)
class SynapseSite(Site):
"""
Subclass of a twisted http Site that does access logging with python's
standard logging
"""
def __init__(self, logger_name, config, resource, *args, **kwargs):
def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs):
Site.__init__(self, resource, *args, **kwargs)
if config.captcha_ip_origin_is_x_forwarded:
self._log_formatter = proxiedLogFormatter
else:
self._log_formatter = combinedLogFormatter
self.site_tag = site_tag
proxied = config.get("x_forwarded", False)
self.requestFactory = SynapseRequestFactory(self, proxied)
self.access_logger = logging.getLogger(logger_name)
def log(self, request):
line = self._log_formatter(self._logDateTime, request)
self.access_logger.info(line)
pass
def create_resource_tree(desired_tree, redirect_root_to_web_client=True):
"""Create the resource tree for this Home Server.
This in unduly complicated because Twisted does not support putting
child resources more than 1 level deep at a time.
Args:
web_client (bool): True to enable the web client.
redirect_root_to_web_client (bool): True to redirect '/' to the
location of the web client. This does nothing if web_client is not
True.
"""
if redirect_root_to_web_client and WEB_CLIENT_PREFIX in desired_tree:
root_resource = RootRedirect(WEB_CLIENT_PREFIX)
else:
root_resource = Resource()
# ideally we'd just use getChild and putChild but getChild doesn't work
# unless you give it a Request object IN ADDITION to the name :/ So
# instead, we'll store a copy of this mapping so we can actually add
# extra resources to existing nodes. See self._resource_id for the key.
resource_mappings = {}
for full_path, res in desired_tree.items():
logger.info("Attaching %s to path %s", res, full_path)
last_resource = root_resource
for path_seg in full_path.split('/')[1:-1]:
if path_seg not in last_resource.listNames():
# resource doesn't exist, so make a "dummy resource"
child_resource = Resource()
last_resource.putChild(path_seg, child_resource)
res_id = _resource_id(last_resource, path_seg)
resource_mappings[res_id] = child_resource
last_resource = child_resource
else:
# we have an existing Resource, use that instead.
res_id = _resource_id(last_resource, path_seg)
last_resource = resource_mappings[res_id]
# ===========================
# now attach the actual desired resource
last_path_seg = full_path.split('/')[-1]
# if there is already a resource here, thieve its children and
# replace it
res_id = _resource_id(last_resource, last_path_seg)
if res_id in resource_mappings:
# there is a dummy resource at this path already, which needs
# to be replaced with the desired resource.
existing_dummy_resource = resource_mappings[res_id]
for child_name in existing_dummy_resource.listNames():
child_res_id = _resource_id(
existing_dummy_resource, child_name
)
child_resource = resource_mappings[child_res_id]
# steal the children
res.putChild(child_name, child_resource)
# finally, insert the desired resource in the right place
last_resource.putChild(last_path_seg, res)
res_id = _resource_id(last_resource, last_path_seg)
resource_mappings[res_id] = res
return root_resource
def _resource_id(resource, path_seg):
"""Construct an arbitrary resource ID so you can retrieve the mapping
later.
If you want to represent resource A putChild resource B with path C,
the mapping should looks like _resource_id(A,C) = B.
Args:
resource (Resource): The *parent* Resource
path_seg (str): The name of the child Resource to be attached.
Returns:
str: A unique string which can be a key to the child Resource.
"""
return "%s-%s" % (resource, path_seg)
def run(hs):

View File

@@ -148,7 +148,7 @@ class Config(object):
if not config_args.config_path:
config_parser.error(
"Must supply a config file.\nA config file can be automatically"
" generated using \"--generate-config -h SERVER_NAME"
" generated using \"--generate-config -H SERVER_NAME"
" -c CONFIG-FILE\""
)
@@ -209,7 +209,7 @@ class Config(object):
if not config_args.config_path:
config_parser.error(
"Must supply a config file.\nA config file can be automatically"
" generated using \"--generate-config -h SERVER_NAME"
" generated using \"--generate-config -H SERVER_NAME"
" -c CONFIG-FILE\""
)

View File

@@ -21,10 +21,6 @@ class CaptchaConfig(Config):
self.recaptcha_private_key = config["recaptcha_private_key"]
self.recaptcha_public_key = config["recaptcha_public_key"]
self.enable_registration_captcha = config["enable_registration_captcha"]
# XXX: This is used for more than just captcha
self.captcha_ip_origin_is_x_forwarded = (
config["captcha_ip_origin_is_x_forwarded"]
)
self.captcha_bypass_secret = config.get("captcha_bypass_secret")
self.recaptcha_siteverify_api = config["recaptcha_siteverify_api"]
@@ -43,10 +39,6 @@ class CaptchaConfig(Config):
# public/private key.
enable_registration_captcha: False
# When checking captchas, use the X-Forwarded-For (XFF) header
# as the client IP and not the actual client IP.
captcha_ip_origin_is_x_forwarded: False
# A secret key used to bypass the captcha test entirely.
#captcha_bypass_secret: "YOUR_SECRET_HERE"

View File

@@ -28,10 +28,4 @@ class MetricsConfig(Config):
# Enable collection and rendering of performance metrics
enable_metrics: False
# Separate port to accept metrics requests on
# metrics_port: 8081
# Which host to bind the metric listener to
# metrics_bind_host: 127.0.0.1
"""

View File

@@ -21,13 +21,18 @@ class ContentRepositoryConfig(Config):
self.max_upload_size = self.parse_size(config["max_upload_size"])
self.max_image_pixels = self.parse_size(config["max_image_pixels"])
self.media_store_path = self.ensure_directory(config["media_store_path"])
self.uploads_path = self.ensure_directory(config["uploads_path"])
def default_config(self, config_dir_path, server_name):
media_store = self.default_path("media_store")
uploads_path = self.default_path("uploads")
return """
# Directory where uploaded images and attachments are stored.
media_store_path: "%(media_store)s"
# Directory where in-progress uploads are stored.
uploads_path: "%(uploads_path)s"
# The largest allowed upload size in bytes
max_upload_size: "10M"

View File

@@ -20,26 +20,97 @@ class ServerConfig(Config):
def read_config(self, config):
self.server_name = config["server_name"]
self.bind_port = config["bind_port"]
self.bind_host = config["bind_host"]
self.unsecure_port = config["unsecure_port"]
self.manhole = config.get("manhole")
self.pid_file = self.abspath(config.get("pid_file"))
self.web_client = config["web_client"]
self.soft_file_limit = config["soft_file_limit"]
self.daemonize = config.get("daemonize")
self.use_frozen_dicts = config.get("use_frozen_dicts", True)
self.gzip_responses = config["gzip_responses"]
self.listeners = config.get("listeners", [])
bind_port = config.get("bind_port")
if bind_port:
self.listeners = []
bind_host = config.get("bind_host", "")
gzip_responses = config.get("gzip_responses", True)
names = ["client", "webclient"] if self.web_client else ["client"]
self.listeners.append({
"port": bind_port,
"bind_address": bind_host,
"tls": True,
"type": "http",
"resources": [
{
"names": names,
"compress": gzip_responses,
},
{
"names": ["federation"],
"compress": False,
}
]
})
unsecure_port = config.get("unsecure_port", bind_port - 400)
if unsecure_port:
self.listeners.append({
"port": unsecure_port,
"bind_address": bind_host,
"tls": False,
"type": "http",
"resources": [
{
"names": names,
"compress": gzip_responses,
},
{
"names": ["federation"],
"compress": False,
}
]
})
manhole = config.get("manhole")
if manhole:
self.listeners.append({
"port": manhole,
"bind_address": "127.0.0.1",
"type": "manhole",
})
metrics_port = config.get("metrics_port")
if metrics_port:
self.listeners.append({
"port": metrics_port,
"bind_address": config.get("metrics_bind_host", "127.0.0.1"),
"tls": False,
"type": "http",
"resources": [
{
"names": ["metrics"],
"compress": False,
},
]
})
# Attempt to guess the content_addr for the v0 content repostitory
content_addr = config.get("content_addr")
if not content_addr:
for listener in self.listeners:
if listener["type"] == "http" and not listener.get("tls", False):
unsecure_port = listener["port"]
break
else:
raise RuntimeError("Could not determine 'content_addr'")
host = self.server_name
if ':' not in host:
host = "%s:%d" % (host, self.unsecure_port)
host = "%s:%d" % (host, unsecure_port)
else:
host = host.split(':')[0]
host = "%s:%d" % (host, self.unsecure_port)
host = "%s:%d" % (host, unsecure_port)
content_addr = "http://%s" % (host,)
self.content_addr = content_addr
@@ -61,18 +132,6 @@ class ServerConfig(Config):
# e.g. matrix.org, localhost:8080, etc.
server_name: "%(server_name)s"
# The port to listen for HTTPS requests on.
# For when matrix traffic is sent directly to synapse.
bind_port: %(bind_port)s
# The port to listen for HTTP requests on.
# For when matrix traffic passes through loadbalancer that unwraps TLS.
unsecure_port: %(unsecure_port)s
# Local interface to listen on.
# The empty string will cause synapse to listen on all interfaces.
bind_host: ""
# When running as a daemon, the file to store the pid in
pid_file: %(pid_file)s
@@ -84,14 +143,64 @@ class ServerConfig(Config):
# hard limit.
soft_file_limit: 0
# Turn on the twisted telnet manhole service on localhost on the given
# port.
#manhole: 9000
# List of ports that Synapse should listen on, their purpose and their
# configuration.
listeners:
# Main HTTPS listener
# For when matrix traffic is sent directly to synapse.
-
# The port to listen for HTTPS requests on.
port: %(bind_port)s
# Should synapse compress HTTP responses to clients that support it?
# This should be disabled if running synapse behind a load balancer
# that can do automatic compression.
gzip_responses: True
# Local interface to listen on.
# The empty string will cause synapse to listen on all interfaces.
bind_address: ''
# This is a 'http' listener, allows us to specify 'resources'.
type: http
tls: true
# Use the X-Forwarded-For (XFF) header as the client IP and not the
# actual client IP.
x_forwarded: false
# List of HTTP resources to serve on this listener.
resources:
-
# List of resources to host on this listener.
names:
- client # The client-server APIs, both v1 and v2
- webclient # The bundled webclient.
# Should synapse compress HTTP responses to clients that support it?
# This should be disabled if running synapse behind a load balancer
# that can do automatic compression.
compress: true
- names: [federation] # Federation APIs
compress: false
# Unsecure HTTP listener,
# For when matrix traffic passes through loadbalancer that unwraps TLS.
- port: %(unsecure_port)s
tls: false
bind_address: ''
type: http
x_forwarded: false
resources:
- names: [client, webclient]
compress: true
- names: [federation]
compress: false
# Turn on the twisted telnet manhole service on localhost on the given
# port.
# - port: 9000
# bind_address: 127.0.0.1
# type: manhole
""" % locals()
def read_arguments(self, args):

View File

@@ -94,6 +94,7 @@ class TransportLayerServer(object):
yield self.keyring.verify_json_for_server(origin, json_request)
logger.info("Request from %s", origin)
request.authenticated_entity = origin
defer.returnValue((origin, content))

View File

@@ -177,7 +177,7 @@ class ApplicationServicesHandler(object):
return
user_info = yield self.store.get_user_by_id(user_id)
if not user_info:
if user_info:
defer.returnValue(False)
return

View File

@@ -380,15 +380,6 @@ class MessageHandler(BaseHandler):
if limit is None:
limit = 10
messages, token = yield self.store.get_recent_events_for_room(
room_id,
limit=limit,
end_token=now_token.room_key,
)
start_token = now_token.copy_and_replace("room_key", token[0])
end_token = now_token.copy_and_replace("room_key", token[1])
room_members = [
m for m in current_state.values()
if m.type == EventTypes.Member
@@ -396,19 +387,38 @@ class MessageHandler(BaseHandler):
]
presence_handler = self.hs.get_handlers().presence_handler
presence = []
for m in room_members:
try:
member_presence = yield presence_handler.get_state(
target_user=UserID.from_string(m.user_id),
auth_user=auth_user,
as_event=True,
)
presence.append(member_presence)
except SynapseError:
logger.exception(
"Failed to get member presence of %r", m.user_id
@defer.inlineCallbacks
def get_presence():
presence_defs = yield defer.DeferredList(
[
presence_handler.get_state(
target_user=UserID.from_string(m.user_id),
auth_user=auth_user,
as_event=True,
check_auth=False,
)
for m in room_members
],
consumeErrors=True,
)
defer.returnValue([p for success, p in presence_defs if success])
presence, (messages, token) = yield defer.gatherResults(
[
get_presence(),
self.store.get_recent_events_for_room(
room_id,
limit=limit,
end_token=now_token.room_key,
)
],
consumeErrors=True,
).addErrback(unwrapFirstError)
start_token = now_token.copy_and_replace("room_key", token[0])
end_token = now_token.copy_and_replace("room_key", token[1])
time_now = self.clock.time_msec()

View File

@@ -191,24 +191,24 @@ class PresenceHandler(BaseHandler):
defer.returnValue(False)
@defer.inlineCallbacks
def get_state(self, target_user, auth_user, as_event=False):
def get_state(self, target_user, auth_user, as_event=False, check_auth=True):
if self.hs.is_mine(target_user):
visible = yield self.is_presence_visible(
observer_user=auth_user,
observed_user=target_user
)
if check_auth:
visible = yield self.is_presence_visible(
observer_user=auth_user,
observed_user=target_user
)
if not visible:
raise SynapseError(404, "Presence information not visible")
state = yield self.store.get_presence_state(target_user.localpart)
if "mtime" in state:
del state["mtime"]
state["presence"] = state.pop("state")
if not visible:
raise SynapseError(404, "Presence information not visible")
if target_user in self._user_cachemap:
cached_state = self._user_cachemap[target_user].get_state()
if "last_active" in cached_state:
state["last_active"] = cached_state["last_active"]
state = self._user_cachemap[target_user].get_state()
else:
state = yield self.store.get_presence_state(target_user.localpart)
if "mtime" in state:
del state["mtime"]
state["presence"] = state.pop("state")
else:
# TODO(paul): Have remote server send us permissions set
state = self._get_or_offline_usercache(target_user).get_state()

View File

@@ -61,21 +61,31 @@ class SimpleHttpClient(object):
self.agent = Agent(reactor, pool=pool)
self.version_string = hs.version_string
def request(self, method, *args, **kwargs):
def request(self, method, uri, *args, **kwargs):
# A small wrapper around self.agent.request() so we can easily attach
# counters to it
outgoing_requests_counter.inc(method)
d = preserve_context_over_fn(
self.agent.request,
method, *args, **kwargs
method, uri, *args, **kwargs
)
logger.info("Sending request %s %s", method, uri)
def _cb(response):
incoming_responses_counter.inc(method, response.code)
logger.info(
"Received response to %s %s: %s",
method, uri, response.code
)
return response
def _eb(failure):
incoming_responses_counter.inc(method, "ERR")
logger.info(
"Error sending request to %s %s: %s %s",
method, uri, failure.type, failure.getErrorMessage()
)
return failure
d.addCallbacks(_cb, _eb)
@@ -84,7 +94,9 @@ class SimpleHttpClient(object):
@defer.inlineCallbacks
def post_urlencoded_get_json(self, uri, args={}):
# TODO: Do we ever want to log message contents?
logger.debug("post_urlencoded_get_json args: %s", args)
query_bytes = urllib.urlencode(args, True)
response = yield self.request(
@@ -97,7 +109,7 @@ class SimpleHttpClient(object):
bodyProducer=FileBodyProducer(StringIO(query_bytes))
)
body = yield readBody(response)
body = yield preserve_context_over_fn(readBody, response)
defer.returnValue(json.loads(body))
@@ -105,7 +117,7 @@ class SimpleHttpClient(object):
def post_json_get_json(self, uri, post_json):
json_str = encode_canonical_json(post_json)
logger.info("HTTP POST %s -> %s", json_str, uri)
logger.debug("HTTP POST %s -> %s", json_str, uri)
response = yield self.request(
"POST",
@@ -116,7 +128,7 @@ class SimpleHttpClient(object):
bodyProducer=FileBodyProducer(StringIO(json_str))
)
body = yield readBody(response)
body = yield preserve_context_over_fn(readBody, response)
defer.returnValue(json.loads(body))
@@ -149,7 +161,7 @@ class SimpleHttpClient(object):
})
)
body = yield readBody(response)
body = yield preserve_context_over_fn(readBody, response)
if 200 <= response.code < 300:
defer.returnValue(json.loads(body))
@@ -192,7 +204,7 @@ class SimpleHttpClient(object):
bodyProducer=FileBodyProducer(StringIO(json_str))
)
body = yield readBody(response)
body = yield preserve_context_over_fn(readBody, response)
if 200 <= response.code < 300:
defer.returnValue(json.loads(body))
@@ -226,7 +238,7 @@ class CaptchaServerHttpClient(SimpleHttpClient):
)
try:
body = yield readBody(response)
body = yield preserve_context_over_fn(readBody, response)
defer.returnValue(body)
except PartialDownloadError as e:
# twisted dislikes google's response, no content length.

View File

@@ -35,11 +35,13 @@ from syutil.crypto.jsonsign import sign_json
import simplejson as json
import logging
import sys
import urllib
import urlparse
logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound")
metrics = synapse.metrics.get_metrics_for(__name__)
@@ -109,6 +111,8 @@ class MatrixFederationHttpClient(object):
self.clock = hs.get_clock()
self.version_string = hs.version_string
self._next_id = 1
@defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes,
body_callback, headers_dict={}, param_bytes=b"",
@@ -123,88 +127,98 @@ class MatrixFederationHttpClient(object):
("", "", path_bytes, param_bytes, query_bytes, "",)
)
logger.info("Sending request to %s: %s %s",
destination, method, url_bytes)
txn_id = "%s-O-%s" % (method, self._next_id)
self._next_id = (self._next_id + 1) % (sys.maxint - 1)
logger.debug(
"Types: %s",
[
type(destination), type(method), type(path_bytes),
type(param_bytes),
type(query_bytes)
]
outbound_logger.info(
"{%s} [%s] Sending request: %s %s",
txn_id, destination, method, url_bytes
)
# XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place)
retries_left = 5
endpoint = self._getEndpoint(reactor, destination)
while True:
producer = None
if body_callback:
producer = body_callback(method, url_bytes, headers_dict)
try:
request_deferred = preserve_context_over_fn(
self.agent.request,
destination,
endpoint,
method,
path_bytes,
param_bytes,
query_bytes,
Headers(headers_dict),
producer
)
response = yield self.clock.time_bound_deferred(
request_deferred,
time_out=timeout/1000. if timeout else 60,
)
logger.debug("Got response to %s", method)
break
except Exception as e:
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
logger.warn(
"DNS Lookup failed to %s with %s",
destination,
e
)
raise
logger.warn(
"Sending request failed to %s: %s %s: %s - %s",
destination,
method,
url_bytes,
type(e).__name__,
_flatten_response_never_received(e),
)
if retries_left and not timeout:
yield sleep(2 ** (5 - retries_left))
retries_left -= 1
else:
raise
logger.info(
"Received response %d %s for %s: %s %s",
response.code,
response.phrase,
destination,
method,
url_bytes
endpoint = preserve_context_over_fn(
self._getEndpoint, reactor, destination
)
log_result = None
try:
while True:
producer = None
if body_callback:
producer = body_callback(method, url_bytes, headers_dict)
try:
def send_request():
request_deferred = self.agent.request(
destination,
endpoint,
method,
path_bytes,
param_bytes,
query_bytes,
Headers(headers_dict),
producer
)
return self.clock.time_bound_deferred(
request_deferred,
time_out=timeout/1000. if timeout else 60,
)
response = yield preserve_context_over_fn(
send_request,
)
log_result = "%d %s" % (response.code, response.phrase,)
break
except Exception as e:
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
logger.warn(
"DNS Lookup failed to %s with %s",
destination,
e
)
log_result = "DNS Lookup failed to %s with %s" % (
destination, e
)
raise
logger.warn(
"{%s} Sending request failed to %s: %s %s: %s - %s",
txn_id,
destination,
method,
url_bytes,
type(e).__name__,
_flatten_response_never_received(e),
)
log_result = "%s - %s" % (
type(e).__name__, _flatten_response_never_received(e),
)
if retries_left and not timeout:
yield sleep(2 ** (5 - retries_left))
retries_left -= 1
else:
raise
finally:
outbound_logger.info(
"{%s} [%s] Result: %s",
txn_id,
destination,
log_result,
)
if 200 <= response.code < 300:
pass
else:
# :'(
# Update transactions table?
body = yield readBody(response)
body = yield preserve_context_over_fn(readBody, response)
raise HttpResponseException(
response.code, response.phrase, body
)
@@ -284,10 +298,7 @@ class MatrixFederationHttpClient(object):
"Content-Type not application/json"
)
logger.debug("Getting resp body")
body = yield readBody(response)
logger.debug("Got resp body")
body = yield preserve_context_over_fn(readBody, response)
defer.returnValue(json.loads(body))
@defer.inlineCallbacks
@@ -330,9 +341,7 @@ class MatrixFederationHttpClient(object):
"Content-Type not application/json"
)
logger.debug("Getting resp body")
body = yield readBody(response)
logger.debug("Got resp body")
body = yield preserve_context_over_fn(readBody, response)
defer.returnValue(json.loads(body))
@@ -390,9 +399,7 @@ class MatrixFederationHttpClient(object):
"Content-Type not application/json"
)
logger.debug("Getting resp body")
body = yield readBody(response)
logger.debug("Got resp body")
body = yield preserve_context_over_fn(readBody, response)
defer.returnValue(json.loads(body))
@@ -435,7 +442,10 @@ class MatrixFederationHttpClient(object):
headers = dict(response.headers.getAllRawHeaders())
try:
length = yield _readBodyToFile(response, output_stream, max_size)
length = yield preserve_context_over_fn(
_readBodyToFile,
response, output_stream, max_size
)
except:
logger.exception("Failed to download body")
raise

View File

@@ -79,53 +79,39 @@ def request_handler(request_handler):
_next_request_id += 1
with LoggingContext(request_id) as request_context:
request_context.request = request_id
code = None
start = self.clock.time_msec()
try:
logger.info(
"Received request: %s %s",
request.method, request.path
)
d = request_handler(self, request)
with PreserveLoggingContext():
yield d
code = request.code
except CodeMessageException as e:
code = e.code
if isinstance(e, SynapseError):
logger.info(
"%s SynapseError: %s - %s", request, code, e.msg
with request.processing():
try:
d = request_handler(self, request)
with PreserveLoggingContext():
yield d
except CodeMessageException as e:
code = e.code
if isinstance(e, SynapseError):
logger.info(
"%s SynapseError: %s - %s", request, code, e.msg
)
else:
logger.exception(e)
outgoing_responses_counter.inc(request.method, str(code))
respond_with_json(
request, code, cs_exception(e), send_cors=True,
pretty_print=_request_user_agent_is_curl(request),
version_string=self.version_string,
)
except:
logger.exception(
"Failed handle request %s.%s on %r: %r",
request_handler.__module__,
request_handler.__name__,
self,
request
)
respond_with_json(
request,
500,
{"error": "Internal server error"},
send_cors=True
)
else:
logger.exception(e)
outgoing_responses_counter.inc(request.method, str(code))
respond_with_json(
request, code, cs_exception(e), send_cors=True,
pretty_print=_request_user_agent_is_curl(request),
version_string=self.version_string,
)
except:
code = 500
logger.exception(
"Failed handle request %s.%s on %r: %r",
request_handler.__module__,
request_handler.__name__,
self,
request
)
respond_with_json(
request,
500,
{"error": "Internal server error"},
send_cors=True
)
finally:
code = str(code) if code else "-"
end = self.clock.time_msec()
logger.info(
"Processed request: %dms %s %s %s",
end-start, code, request.method, request.path
)
return wrapped_request_handler

View File

@@ -16,7 +16,7 @@
from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.util.async import run_on_reactor, ObservableDeferred
from synapse.types import StreamToken
import synapse.metrics
@@ -45,21 +45,11 @@ class _NotificationListener(object):
The events stream handler will have yielded to the deferred, so to
notify the handler it is sufficient to resolve the deferred.
"""
__slots__ = ["deferred"]
def __init__(self, deferred):
self.deferred = deferred
def notified(self):
return self.deferred.called
def notify(self, token):
""" Inform whoever is listening about the new events.
"""
try:
self.deferred.callback(token)
except defer.AlreadyCalledError:
pass
class _NotifierUserStream(object):
"""This represents a user connected to the event stream.
@@ -75,11 +65,12 @@ class _NotifierUserStream(object):
appservice=None):
self.user = str(user)
self.appservice = appservice
self.listeners = set()
self.rooms = set(rooms)
self.current_token = current_token
self.last_notified_ms = time_now_ms
self.notify_deferred = ObservableDeferred(defer.Deferred())
def notify(self, stream_key, stream_id, time_now_ms):
"""Notify any listeners for this user of a new event from an
event source.
@@ -91,12 +82,10 @@ class _NotifierUserStream(object):
self.current_token = self.current_token.copy_and_advance(
stream_key, stream_id
)
if self.listeners:
self.last_notified_ms = time_now_ms
listeners = self.listeners
self.listeners = set()
for listener in listeners:
listener.notify(self.current_token)
self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred
self.notify_deferred = ObservableDeferred(defer.Deferred())
noify_deferred.callback(self.current_token)
def remove(self, notifier):
""" Remove this listener from all the indexes in the Notifier
@@ -114,6 +103,18 @@ class _NotifierUserStream(object):
self.appservice, set()
).discard(self)
def count_listeners(self):
return len(self.notify_deferred.observers())
def new_listener(self, token):
"""Returns a deferred that is resolved when there is a new token
greater than the given token.
"""
if self.current_token.is_after(token):
return _NotificationListener(defer.succeed(self.current_token))
else:
return _NotificationListener(self.notify_deferred.observe())
class Notifier(object):
""" This class is responsible for notifying any listeners when there are
@@ -158,7 +159,7 @@ class Notifier(object):
for x in self.appservice_to_user_streams.values():
all_user_streams |= x
return sum(len(stream.listeners) for stream in all_user_streams)
return sum(stream.count_listeners() for stream in all_user_streams)
metrics.register_callback("listeners", count_listeners)
metrics.register_callback(
@@ -286,10 +287,6 @@ class Notifier(object):
"""Wait until the callback returns a non empty response or the
timeout fires.
"""
deferred = defer.Deferred()
time_now_ms = self.clock.time_msec()
user = str(user)
user_stream = self.user_to_user_stream.get(user)
if user_stream is None:
@@ -302,55 +299,44 @@ class Notifier(object):
rooms=rooms,
appservice=appservice,
current_token=current_token,
time_now_ms=time_now_ms,
time_now_ms=self.clock.time_msec(),
)
self._register_with_keys(user_stream)
result = None
if timeout:
# Will be set to a _NotificationListener that we'll be waiting on.
# Allows us to cancel it.
listener = None
def timed_out():
if listener:
listener.deferred.cancel()
timer = self.clock.call_later(timeout/1000., timed_out)
prev_token = from_token
while not result:
try:
current_token = user_stream.current_token
result = yield callback(prev_token, current_token)
if result:
break
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
# We need to supply the token we supplied to callback so
# that we don't miss any current_token updates.
prev_token = current_token
listener = user_stream.new_listener(prev_token)
yield listener.deferred
except defer.CancelledError:
break
self.clock.cancel_call_later(timer, ignore_errs=True)
else:
current_token = user_stream.current_token
listener = [_NotificationListener(deferred)]
if timeout and not current_token.is_after(from_token):
user_stream.listeners.add(listener[0])
if current_token.is_after(from_token):
result = yield callback(from_token, current_token)
else:
result = None
timer = [None]
if result:
user_stream.listeners.discard(listener[0])
defer.returnValue(result)
return
if timeout:
timed_out = [False]
def _timeout_listener():
timed_out[0] = True
timer[0] = None
user_stream.listeners.discard(listener[0])
listener[0].notify(current_token)
# We create multiple notification listeners so we have to manage
# canceling the timeout ourselves.
timer[0] = self.clock.call_later(timeout/1000., _timeout_listener)
while not result and not timed_out[0]:
new_token = yield deferred
deferred = defer.Deferred()
listener[0] = _NotificationListener(deferred)
user_stream.listeners.add(listener[0])
result = yield callback(current_token, new_token)
current_token = new_token
if timer[0] is not None:
try:
self.clock.cancel_call_later(timer[0])
except:
logger.exception("Failed to cancel notifer timer")
defer.returnValue(result)
@@ -368,6 +354,9 @@ class Notifier(object):
@defer.inlineCallbacks
def check_for_updates(before_token, after_token):
if not after_token.is_after(before_token):
defer.returnValue(None)
events = []
end_token = from_token
for name, source in self.event_sources.sources.items():
@@ -402,7 +391,7 @@ class Notifier(object):
expired_streams = []
expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS
for stream in self.user_to_user_stream.values():
if stream.listeners:
if stream.count_listeners():
continue
if stream.last_notified_ms < expire_before_ts:
expired_streams.append(stream)

View File

@@ -39,10 +39,10 @@ class HttpTransactionStore(object):
A tuple of (HTTP response code, response content) or None.
"""
try:
logger.debug("get_response Key: %s TxnId: %s", key, txn_id)
logger.debug("get_response TxnId: %s", txn_id)
(last_txn_id, response) = self.transactions[key]
if txn_id == last_txn_id:
logger.info("get_response: Returning a response for %s", key)
logger.info("get_response: Returning a response for %s", txn_id)
return response
except KeyError:
pass
@@ -58,7 +58,7 @@ class HttpTransactionStore(object):
txn_id (str): The transaction ID for this request.
response (tuple): A tuple of (HTTP response code, response content)
"""
logger.debug("store_response Key: %s TxnId: %s", key, txn_id)
logger.debug("store_response TxnId: %s", txn_id)
self.transactions[key] = (txn_id, response)
def store_client_transaction(self, request, txn_id, response):

View File

@@ -132,16 +132,8 @@ class BaseHomeServer(object):
setattr(BaseHomeServer, "get_%s" % (depname), _get)
def get_ip_from_request(self, request):
# May be an X-Forwarding-For header depending on config
ip_addr = request.getClientIP()
if self.config.captcha_ip_origin_is_x_forwarded:
# use the header
if request.requestHeaders.hasHeader("X-Forwarded-For"):
ip_addr = request.requestHeaders.getRawHeaders(
"X-Forwarded-For"
)[0]
return ip_addr
# X-Forwarded-For is handled by our custom request type.
return request.getClientIP()
def is_mine(self, domain_specific_string):
return domain_specific_string.domain == self.hostname

View File

@@ -0,0 +1 @@
SELECT 1;

View File

@@ -91,8 +91,12 @@ class Clock(object):
with PreserveLoggingContext():
return reactor.callLater(delay, wrapped_callback, *args, **kwargs)
def cancel_call_later(self, timer):
timer.cancel()
def cancel_call_later(self, timer, ignore_errs=False):
try:
timer.cancel()
except:
if not ignore_errs:
raise
def time_bound_deferred(self, given_deferred, time_out):
if given_deferred.called:

View File

@@ -38,6 +38,9 @@ class ObservableDeferred(object):
deferred.
If consumeErrors is true errors will be captured from the origin deferred.
Cancelling or otherwise resolving an observer will not affect the original
ObservableDeferred.
"""
__slots__ = ["_deferred", "_observers", "_result"]
@@ -45,7 +48,7 @@ class ObservableDeferred(object):
def __init__(self, deferred, consumeErrors=False):
object.__setattr__(self, "_deferred", deferred)
object.__setattr__(self, "_result", None)
object.__setattr__(self, "_observers", [])
object.__setattr__(self, "_observers", set())
def callback(r):
self._result = (True, r)
@@ -74,12 +77,21 @@ class ObservableDeferred(object):
def observe(self):
if not self._result:
d = defer.Deferred()
self._observers.append(d)
def remove(r):
self._observers.discard(d)
return r
d.addBoth(remove)
self._observers.add(d)
return d
else:
success, res = self._result
return defer.succeed(res) if success else defer.fail(res)
def observers(self):
return self._observers
def __getattr__(self, name):
return getattr(self._deferred, name)

View File

@@ -140,6 +140,37 @@ class PreserveLoggingContext(object):
)
class _PreservingContextDeferred(defer.Deferred):
"""A deferred that ensures that all callbacks and errbacks are called with
the given logging context.
"""
def __init__(self, context):
self._log_context = context
defer.Deferred.__init__(self)
def addCallbacks(self, callback, errback=None,
callbackArgs=None, callbackKeywords=None,
errbackArgs=None, errbackKeywords=None):
callback = self._wrap_callback(callback)
errback = self._wrap_callback(errback)
return defer.Deferred.addCallbacks(
self, callback,
errback=errback,
callbackArgs=callbackArgs,
callbackKeywords=callbackKeywords,
errbackArgs=errbackArgs,
errbackKeywords=errbackKeywords,
)
def _wrap_callback(self, f):
def g(res, *args, **kwargs):
with PreserveLoggingContext():
LoggingContext.thread_local.current_context = self._log_context
res = f(res, *args, **kwargs)
return res
return g
def preserve_context_over_fn(fn, *args, **kwargs):
"""Takes a function and invokes it with the given arguments, but removes
and restores the current logging context while doing so.
@@ -160,24 +191,7 @@ def preserve_context_over_deferred(deferred):
"""Given a deferred wrap it such that any callbacks added later to it will
be invoked with the current context.
"""
d = defer.Deferred()
current_context = LoggingContext.current_context()
def cb(res):
with PreserveLoggingContext():
LoggingContext.thread_local.current_context = current_context
res = d.callback(res)
return res
def eb(failure):
with PreserveLoggingContext():
LoggingContext.thread_local.current_context = current_context
res = d.errback(failure)
return res
if deferred.called:
return deferred
deferred.addCallbacks(cb, eb)
d = _PreservingContextDeferred(current_context)
deferred.chainDeferred(d)
return d

View File

@@ -57,6 +57,49 @@ class AppServiceHandlerTestCase(unittest.TestCase):
interested_service, event
)
@defer.inlineCallbacks
def test_query_user_exists_unknown_user(self):
user_id = "@someone:anywhere"
services = [self._mkservice(is_interested=True)]
services[0].is_interested_in_user = Mock(return_value=True)
self.mock_store.get_app_services = Mock(return_value=services)
self.mock_store.get_user_by_id = Mock(return_value=None)
event = Mock(
sender=user_id,
type="m.room.message",
room_id="!foo:bar"
)
self.mock_as_api.push = Mock()
self.mock_as_api.query_user = Mock()
yield self.handler.notify_interested_services(event)
self.mock_as_api.query_user.assert_called_once_with(
services[0], user_id
)
@defer.inlineCallbacks
def test_query_user_exists_known_user(self):
user_id = "@someone:anywhere"
services = [self._mkservice(is_interested=True)]
services[0].is_interested_in_user = Mock(return_value=True)
self.mock_store.get_app_services = Mock(return_value=services)
self.mock_store.get_user_by_id = Mock(return_value={
"name": user_id
})
event = Mock(
sender=user_id,
type="m.room.message",
room_id="!foo:bar"
)
self.mock_as_api.push = Mock()
self.mock_as_api.query_user = Mock()
yield self.handler.notify_interested_services(event)
self.assertFalse(
self.mock_as_api.query_user.called,
"query_user called when it shouldn't have been."
)
@defer.inlineCallbacks
def test_query_room_alias_exists(self):
room_alias_str = "#foo:bar"

View File

@@ -114,6 +114,8 @@ class MockHttpResource(HttpServer):
mock_request.method = http_method
mock_request.uri = path
mock_request.getClientIP.return_value = "-"
mock_request.requestHeaders.getRawHeaders.return_value=[
"X-Matrix origin=test,key=,sig="
]