1
0

Compare commits

..

2 Commits

Author SHA1 Message Date
Erik Johnston
75bf48b905 Update tracer to give more information 2015-03-13 11:38:57 +00:00
Erik Johnston
d1ae594ae5 Add a utility class that can be used to generate a twisted deferred aware call graph 2015-03-12 16:52:02 +00:00
57 changed files with 723 additions and 1315 deletions

View File

@@ -1,12 +1,3 @@
Changes in synapse v0.8.1 (2015-03-18)
======================================
* Disable registration by default. New users can be added using the command
``register_new_matrix_user`` or by enabling registration in the config.
* Add metrics to synapse. To enable metrics use config options
``enable_metrics`` and ``metrics_port``.
* Fix bug where banning only kicked the user.
Changes in synapse v0.8.0 (2015-03-06)
======================================

View File

@@ -1,5 +1,3 @@
.. contents::
Introduction
============
@@ -128,17 +126,6 @@ To set up your homeserver, run (in your virtualenv, as before)::
Substituting your host and domain name as appropriate.
By default, registration of new users is disabled. You can either enable
registration in the config (it is then recommended to also set up CAPTCHA), or
you can use the command line to register new users::
$ source ~/.synapse/bin/activate
$ register_new_matrix_user -c homeserver.yaml https://localhost:8448
New user localpart: erikj
Password:
Confirm password:
Success!
For reliable VoIP calls to be routed via this homeserver, you MUST configure
a TURN server. See docs/turn-howto.rst for details.
@@ -263,8 +250,7 @@ fix try re-installing from PyPI or directly from
ArchLinux
---------
If running `$ synctl start` fails with 'returned non-zero exit status 1',
you will need to explicitly call Python2.7 - either running as::
If running `$ synctl start` fails wit 'returned non-zero exit status 1', you will need to explicitly call Python2.7 - either running as::
$ python2.7 -m synapse.app.homeserver --daemonize -c homeserver.yaml --pid-file homeserver.pid

View File

@@ -175,12 +175,13 @@ sub on_room_message
my $verto_connecting = $loop->new_future;
$bot_verto->connect(
%{ $CONFIG{"verto-bot"} },
on_connected => sub {
warn("[Verto] connected to websocket");
$verto_connecting->done($bot_verto) if not $verto_connecting->is_done;
},
on_connect_error => sub { die "Cannot connect to verto - $_[-1]" },
on_resolve_error => sub { die "Cannot resolve to verto - $_[-1]" },
)->then( sub {
warn("[Verto] connected to websocket");
$verto_connecting->done($bot_verto) if not $verto_connecting->is_done;
});
);
Future->needs_all(
$bot_matrix->login( %{ $CONFIG{"matrix-bot"} } )->then( sub {

View File

@@ -86,7 +86,7 @@ sub create_virtual_user
"user": "$localpart"
}
EOT
)->get;
)->get;
warn $response->as_string if ($response->code != 200);
}
@@ -266,21 +266,17 @@ my $as_url = $CONFIG{"matrix-bot"}->{as_url};
Future->needs_all(
$http->do_request(
method => "POST",
uri => URI->new( $CONFIG{"matrix"}->{server}."/_matrix/appservice/v1/register" ),
content_type => "application/json",
content => <<EOT
method => "POST",
uri => URI->new( $CONFIG{"matrix"}->{server}."/_matrix/appservice/v1/register" ),
content_type => "application/json",
content => <<EOT
{
"as_token": "$as_token",
"url": "$as_url",
"namespaces": { "users": [ { "regex": "\@\\\\+.*", "exclusive": false } ] }
"namespaces": { "users": ["\@\\\\+.*"] }
}
EOT
)->then( sub{
my ($response) = (@_);
warn $response->as_string if ($response->code != 200);
return Future->done;
}),
),
$verto_connecting,
)->get;

View File

@@ -1,149 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import getpass
import hashlib
import hmac
import json
import sys
import urllib2
import yaml
def request_registration(user, password, server_location, shared_secret):
mac = hmac.new(
key=shared_secret,
msg=user,
digestmod=hashlib.sha1,
).hexdigest()
data = {
"user": user,
"password": password,
"mac": mac,
"type": "org.matrix.login.shared_secret",
}
server_location = server_location.rstrip("/")
print "Sending registration request..."
req = urllib2.Request(
"%s/_matrix/client/api/v1/register" % (server_location,),
data=json.dumps(data),
headers={'Content-Type': 'application/json'}
)
try:
f = urllib2.urlopen(req)
f.read()
f.close()
print "Success."
except urllib2.HTTPError as e:
print "ERROR! Received %d %s" % (e.code, e.reason,)
if 400 <= e.code < 500:
if e.info().type == "application/json":
resp = json.load(e)
if "error" in resp:
print resp["error"]
sys.exit(1)
def register_new_user(user, password, server_location, shared_secret):
if not user:
try:
default_user = getpass.getuser()
except:
default_user = None
if default_user:
user = raw_input("New user localpart [%s]: " % (default_user,))
if not user:
user = default_user
else:
user = raw_input("New user localpart: ")
if not user:
print "Invalid user name"
sys.exit(1)
if not password:
password = getpass.getpass("Password: ")
if not password:
print "Password cannot be blank."
sys.exit(1)
confirm_password = getpass.getpass("Confirm password: ")
if password != confirm_password:
print "Passwords do not match"
sys.exit(1)
request_registration(user, password, server_location, shared_secret)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Used to register new users with a given home server when"
" registration has been disabled. The home server must be"
" configured with the 'registration_shared_secret' option"
" set.",
)
parser.add_argument(
"-u", "--user",
default=None,
help="Local part of the new user. Will prompt if omitted.",
)
parser.add_argument(
"-p", "--password",
default=None,
help="New password for user. Will prompt if omitted.",
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"-c", "--config",
type=argparse.FileType('r'),
help="Path to server config file. Used to read in shared secret.",
)
group.add_argument(
"-k", "--shared-secret",
help="Shared secret as defined in server config file.",
)
parser.add_argument(
"server_url",
default="https://localhost:8448",
nargs='?',
help="URL to use to talk to the home server. Defaults to "
" 'https://localhost:8448'.",
)
args = parser.parse_args()
if "config" in args and args.config:
config = yaml.safe_load(args.config)
secret = config.get("registration_shared_secret", None)
if not secret:
print "No 'registration_shared_secret' defined in config."
sys.exit(1)
else:
secret = args.shared_secret
register_new_user(args.user, args.password, args.server_url, secret)

213
scripts/graph_tracer.py Normal file
View File

@@ -0,0 +1,213 @@
import fileinput
import pydot
import sys
import itertools
import json
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = itertools.tee(iterable)
next(b, None)
return itertools.izip(a, b)
nodes = {}
edges = set()
graph = pydot.Dot(graph_name="call_graph", graph_type="digraph")
names = {}
starts = {}
ends = {}
deferreds = set()
deferreds_map = {}
deferred_edges = set()
root_id = None
for line in fileinput.input():
line = line.strip()
try:
if " calls " in line:
start, end = line.split(" calls ")
start, end = start.strip(), end.strip()
edges.add((start, end))
# print start, end
if " named " in line:
node_id, name = line.split(" named ")
names[node_id.strip()] = name.strip()
if name.strip() == "synapse.rest.client.v1.room.RoomSendEventRestServlet.on_PUT":
root_id = node_id
if " in " in line:
node_id, d = line.split(" in ")
deferreds_map[node_id.strip()] = d.strip()
if " is deferred" in line:
node_id, _ = line.split(" is deferred")
deferreds.add(node_id)
if " start " in line:
node_id, ms = line.split(" start ")
starts[node_id.strip()] = int(ms.strip())
if " end " in line:
node_id, ms = line.split(" end ")
ends[node_id.strip()] = int(ms.strip())
if " waits on " in line:
start, end = line.split(" waits on ")
start, end = start.strip(), end.strip()
deferred_edges.add((start, end))
# print start, end
except Exception as e:
sys.stderr.write("failed %s to parse '%s'\n" % (e.message, line))
if not root_id:
sys.stderr.write("Could not find root")
sys.exit(1)
# deferreds_root = set(deferreds.values())
# for parent, child in deferred_edges:
# deferreds_root.discard(child)
#
# deferred_tree = {
# d: {}
# for d in deferreds_root
# }
#
# def populate(root, tree):
# for leaf in deferred_edges.get(root, []):
# populate(leaf, tree.setdefault(leaf, {}))
#
#
# for d in deferreds_root:
# tree = deferred_tree.setdefault(d, {})
# populate(d, tree)
# print deferred_edges
# print root_id
def is_in_deferred(d):
while True:
if d == root_id:
return True
for start, end in deferred_edges:
if d == end:
d = start
break
else:
return False
def walk_graph(d):
res = [d]
while d != root_id:
for start, end in edges:
if d == end:
d = start
res.append(d)
break
else:
return res
return res
def make_tree_el(node_id):
return {
"id": node_id,
"name": names[node_id],
"children": [],
"start": starts[node_id],
"end": ends[node_id],
"size": ends[node_id] - starts[node_id],
}
tree = make_tree_el(root_id)
tree_index = {
root_id: tree,
}
viz_out = {
"nodes": [],
"edges": [],
}
for node_id, name in names.items():
# if times.get(node_id, 100) < 5:
# continue
walk = walk_graph(node_id)
# print walk
if root_id not in walk:
continue
if node_id in deferreds:
if not is_in_deferred(node_id):
continue
elif node_id in deferreds_map:
if not is_in_deferred(deferreds_map[node_id]):
continue
walk_names = [
names[w].split("synapse.", 1)[1] for w in walk
]
for child, parent in reversed(list(pairwise(walk))):
if parent in tree_index and child not in tree_index:
el = make_tree_el(child)
tree_index[parent]["children"].append(el)
tree_index[child] = el
# print "-".join(reversed(["end"] + walk_names)) + ", " + str(ends[node_id] - starts[node_id])
# print "%d,%s,%s,%s" % (len(walk), walk_names[0], starts[node_id], ends[node_id])
viz_out["nodes"].append({
"id": node_id,
"label": names[node_id].split("synapse.", 1)[1],
"value": ends[node_id] - starts[node_id],
"level": len(walk),
})
node = pydot.Node(node_id, label=name)
# if node_id in deferreds:
# clusters[deferreds[node_id]].add_node(node)
# elif node_id in clusters:
# clusters[node_id].add_node(node)
# else:
# graph.add_node(node)
graph.add_node(node)
nodes[node_id] = node
# print node_id
# for el in tree_index.values():
# el["children"].sort(key=lambda e: e["start"])
#
# print json.dumps(tree)
for parent, child in edges:
if child not in nodes:
# sys.stderr.write(child + " not a node\n")
continue
if parent not in nodes:
# sys.stderr.write(parent + " not a node\n")
continue
viz_out["edges"].append({
"from": parent,
"to": child,
"value": ends[child] - starts[child],
})
edge = pydot.Edge(nodes[parent], nodes[child])
graph.add_edge(edge)
print json.dumps(viz_out)
file_prefix = "call_graph_out"
graph.write('%s.dot' % file_prefix, format='raw', prog='dot')
graph.write_svg("%s.svg" % file_prefix, prog='dot')

View File

@@ -45,7 +45,7 @@ setup(
version=version,
packages=find_packages(exclude=["tests", "tests.*"]),
description="Reference Synapse Home Server",
install_requires=dependencies['requirements'](include_conditional=True).keys(),
install_requires=dependencies["REQUIREMENTS"].keys(),
setup_requires=[
"Twisted==14.0.2", # Here to override setuptools_trial's dependency on Twisted>=2.4.0
"setuptools_trial",
@@ -55,5 +55,5 @@ setup(
include_package_data=True,
zip_safe=False,
long_description=long_description,
scripts=["synctl", "register_new_matrix_user"],
scripts=["synctl"],
)

View File

@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.8.1-r1"
__version__ = "0.8.0"

View File

@@ -28,12 +28,6 @@ import logging
logger = logging.getLogger(__name__)
AuthEventTypes = (
EventTypes.Create, EventTypes.Member, EventTypes.PowerLevels,
EventTypes.JoinRules,
)
class Auth(object):
def __init__(self, hs):
@@ -172,7 +166,6 @@ class Auth(object):
target = auth_events.get(key)
target_in_room = target and target.membership == Membership.JOIN
target_banned = target and target.membership == Membership.BAN
key = (EventTypes.JoinRules, "", )
join_rule_event = auth_events.get(key)
@@ -201,7 +194,6 @@ class Auth(object):
{
"caller_in_room": caller_in_room,
"caller_invited": caller_invited,
"target_banned": target_banned,
"target_in_room": target_in_room,
"membership": membership,
"join_rule": join_rule,
@@ -210,11 +202,6 @@ class Auth(object):
}
)
if ban_level:
ban_level = int(ban_level)
else:
ban_level = 50 # FIXME (erikj): What should we do here?
if Membership.INVITE == membership:
# TODO (erikj): We should probably handle this more intelligently
# PRIVATE join rules.
@@ -225,10 +212,6 @@ class Auth(object):
403,
"%s not in room %s." % (event.user_id, event.room_id,)
)
elif target_banned:
raise AuthError(
403, "%s is banned from the room" % (target_user_id,)
)
elif target_in_room: # the target is already in the room.
raise AuthError(403, "%s is already in the room." %
target_user_id)
@@ -238,8 +221,6 @@ class Auth(object):
# joined: It's a NOOP
if event.user_id != target_user_id:
raise AuthError(403, "Cannot force another user to join.")
elif target_banned:
raise AuthError(403, "You are banned from this room")
elif join_rule == JoinRules.PUBLIC:
pass
elif join_rule == JoinRules.INVITE:
@@ -257,10 +238,6 @@ class Auth(object):
403,
"%s not in room %s." % (target_user_id, event.room_id,)
)
elif target_banned and user_level < ban_level:
raise AuthError(
403, "You cannot unban user &s." % (target_user_id,)
)
elif target_user_id != event.user_id:
if kick_level:
kick_level = int(kick_level)
@@ -272,6 +249,11 @@ class Auth(object):
403, "You cannot kick user %s." % target_user_id
)
elif Membership.BAN == membership:
if ban_level:
ban_level = int(ban_level)
else:
ban_level = 50 # FIXME (erikj): What should we do here?
if user_level < ban_level:
raise AuthError(403, "You don't have permission to ban")
else:
@@ -388,7 +370,7 @@ class Auth(object):
AuthError if no user by that token exists or the token is invalid.
"""
try:
ret = yield self.store.get_user_by_token(token)
ret = yield self.store.get_user_by_token(token=token)
if not ret:
raise StoreError(400, "Unknown token")
user_info = {
@@ -430,6 +412,12 @@ class Auth(object):
builder.auth_events = auth_events_entries
context.auth_events = {
k: v
for k, v in context.current_state.items()
if v.event_id in auth_ids
}
def compute_auth_events(self, event, current_state):
if event.type == EventTypes.Create:
return []

View File

@@ -60,7 +60,6 @@ class LoginType(object):
EMAIL_IDENTITY = u"m.login.email.identity"
RECAPTCHA = u"m.login.recaptcha"
APPLICATION_SERVICE = u"m.login.application_service"
SHARED_SECRET = u"org.matrix.login.shared_secret"
class EventTypes(object):

View File

@@ -47,11 +47,12 @@ from synapse.crypto import context_factory
from synapse.util.logcontext import LoggingContext
from synapse.rest.client.v1 import ClientV1RestResource
from synapse.rest.client.v2_alpha import ClientV2AlphaRestResource
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from daemonize import Daemonize
import twisted.manhole.telnet
from synapse.util.traceutil import Tracer
import synapse
import logging
@@ -60,6 +61,8 @@ import re
import resource
import subprocess
import sqlite3
import syweb
logger = logging.getLogger(__name__)
@@ -82,7 +85,6 @@ class SynapseHomeServer(HomeServer):
return AppServiceRestResource(self)
def build_resource_for_web_client(self):
import syweb
syweb_path = os.path.dirname(syweb.__file__)
webclient_path = os.path.join(syweb_path, "webclient")
return File(webclient_path) # TODO configurable?
@@ -101,12 +103,6 @@ class SynapseHomeServer(HomeServer):
def build_resource_for_server_key(self):
return LocalKey(self)
def build_resource_for_metrics(self):
if self.get_config().enable_metrics:
return MetricsResource(self)
else:
return None
def build_db_pool(self):
return adbapi.ConnectionPool(
"sqlite3", self.get_db_name(),
@@ -117,7 +113,7 @@ class SynapseHomeServer(HomeServer):
# so that :memory: sqlite works
)
def create_resource_tree(self, redirect_root_to_web_client):
def create_resource_tree(self, web_client, redirect_root_to_web_client):
"""Create the resource tree for this Home Server.
This in unduly complicated because Twisted does not support putting
@@ -129,9 +125,6 @@ class SynapseHomeServer(HomeServer):
location of the web client. This does nothing if web_client is not
True.
"""
config = self.get_config()
web_client = config.web_client
# list containing (path_str, Resource) e.g:
# [ ("/aaa/bbb/cc", Resource1), ("/aaa/dummy", Resource2) ]
desired_tree = [
@@ -155,10 +148,6 @@ class SynapseHomeServer(HomeServer):
else:
self.root_resource = Resource()
metrics_resource = self.get_resource_for_metrics()
if config.metrics_port is None and metrics_resource is not None:
desired_tree.append((METRICS_PREFIX, metrics_resource))
# ideally we'd just use getChild and putChild but getChild doesn't work
# unless you give it a Request object IN ADDITION to the name :/ So
# instead, we'll store a copy of this mapping so we can actually add
@@ -220,32 +209,17 @@ class SynapseHomeServer(HomeServer):
"""
return "%s-%s" % (resource, path_seg)
def start_listening(self):
config = self.get_config()
if not config.no_tls and config.bind_port is not None:
def start_listening(self, secure_port, unsecure_port):
if secure_port is not None:
reactor.listenSSL(
config.bind_port,
Site(self.root_resource),
self.tls_context_factory,
interface=config.bind_host
secure_port, Site(self.root_resource), self.tls_context_factory
)
logger.info("Synapse now listening on port %d", config.bind_port)
if config.unsecure_port is not None:
logger.info("Synapse now listening on port %d", secure_port)
if unsecure_port is not None:
reactor.listenTCP(
config.unsecure_port,
Site(self.root_resource),
interface=config.bind_host
unsecure_port, Site(self.root_resource)
)
logger.info("Synapse now listening on port %d", config.unsecure_port)
metrics_resource = self.get_resource_for_metrics()
if metrics_resource and config.metrics_port is not None:
reactor.listenTCP(
config.metrics_port, Site(metrics_resource), interface="127.0.0.1",
)
logger.info("Metrics now running on 127.0.0.1 port %d", config.metrics_port)
logger.info("Synapse now listening on port %d", unsecure_port)
def get_version_string():
@@ -343,8 +317,7 @@ def setup(config_options):
config.setup_logging()
# check any extra requirements we have now we have a config
check_requirements(config)
check_requirements()
version_string = get_version_string()
@@ -370,6 +343,7 @@ def setup(config_options):
)
hs.create_resource_tree(
web_client=config.webclient,
redirect_root_to_web_client=True,
)
@@ -398,7 +372,11 @@ def setup(config_options):
f.namespace['hs'] = hs
reactor.listenTCP(config.manhole, f, interface='127.0.0.1')
hs.start_listening()
bind_port = config.bind_port
if config.no_tls:
bind_port = None
hs.start_listening(bind_port, config.unsecure_port)
hs.get_pusherpool().start()
hs.get_state_handler().start_caching()
@@ -424,8 +402,13 @@ class SynapseService(service.Service):
def run(hs):
def in_thread():
try:
tracer = Tracer()
sys.settrace(tracer.process)
except Exception:
logger.exception("Failed to start tracer")
with LoggingContext("run"):
change_resource_limit(hs.config.soft_file_limit)
@@ -451,7 +434,6 @@ def run(hs):
def main():
with LoggingContext("main"):
# check base requirements
check_requirements()
hs = setup(sys.argv[1:])
run(hs)

View File

@@ -23,13 +23,11 @@ from .captcha import CaptchaConfig
from .email import EmailConfig
from .voip import VoipConfig
from .registration import RegistrationConfig
from .metrics import MetricsConfig
class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
EmailConfig, VoipConfig, RegistrationConfig,
MetricsConfig,):
EmailConfig, VoipConfig, RegistrationConfig,):
pass

View File

@@ -1,36 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import Config
class MetricsConfig(Config):
def __init__(self, args):
super(MetricsConfig, self).__init__(args)
self.enable_metrics = args.enable_metrics
self.metrics_port = args.metrics_port
@classmethod
def add_arguments(cls, parser):
super(MetricsConfig, cls).add_arguments(parser)
metrics_group = parser.add_argument_group("metrics")
metrics_group.add_argument(
'--enable-metrics', dest="enable_metrics", action="store_true",
help="Enable collection and rendering of performance metrics"
)
metrics_group.add_argument(
'--metrics-port', metavar="PORT", type=int,
help="Separate port to accept metrics requests on (on localhost)"
)

View File

@@ -15,46 +15,19 @@
from ._base import Config
from synapse.util.stringutils import random_string_with_symbols
import distutils.util
class RegistrationConfig(Config):
def __init__(self, args):
super(RegistrationConfig, self).__init__(args)
# `args.disable_registration` may either be a bool or a string depending
# on if the option was given a value (e.g. --disable-registration=false
# would set `args.disable_registration` to "false" not False.)
self.disable_registration = bool(
distutils.util.strtobool(str(args.disable_registration))
)
self.registration_shared_secret = args.registration_shared_secret
self.disable_registration = args.disable_registration
@classmethod
def add_arguments(cls, parser):
super(RegistrationConfig, cls).add_arguments(parser)
reg_group = parser.add_argument_group("registration")
reg_group.add_argument(
"--disable-registration",
const=True,
default=True,
nargs='?',
help="Disable registration of new users.",
action='store_true',
help="Disable registration of new users."
)
reg_group.add_argument(
"--registration-shared-secret", type=str,
help="If set, allows registration by anyone who also has the shared"
" secret, even if registration is otherwise disabled.",
)
@classmethod
def generate_config(cls, args, config_dir_path):
if args.disable_registration is None:
args.disable_registration = True
if args.registration_shared_secret is None:
args.registration_shared_secret = random_string_with_symbols(50)

View File

@@ -28,7 +28,7 @@ class ServerConfig(Config):
self.unsecure_port = args.unsecure_port
self.daemonize = args.daemonize
self.pid_file = self.abspath(args.pid_file)
self.web_client = args.web_client
self.webclient = True
self.manhole = args.manhole
self.soft_file_limit = args.soft_file_limit
@@ -68,8 +68,6 @@ class ServerConfig(Config):
server_group.add_argument('--pid-file', default="homeserver.pid",
help="When running as a daemon, the file to"
" store the pid in")
server_group.add_argument('--web_client', default=True, type=bool,
help="Whether or not to serve a web client")
server_group.add_argument("--manhole", metavar="PORT", dest="manhole",
type=int,
help="Turn on the twisted telnet manhole"

View File

@@ -16,7 +16,8 @@
class EventContext(object):
def __init__(self, current_state=None):
def __init__(self, current_state=None, auth_events=None):
self.current_state = current_state
self.auth_events = auth_events
self.state_group = None
self.rejected = False

View File

@@ -25,7 +25,6 @@ from synapse.api.errors import (
from synapse.util.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
from synapse.events import FrozenEvent
import synapse.metrics
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
@@ -37,17 +36,9 @@ import random
logger = logging.getLogger(__name__)
# synapse.federation.federation_client is a silly name
metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations")
sent_edus_counter = metrics.register_counter("sent_edus")
sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
class FederationClient(FederationBase):
def __init__(self):
self._get_pdu_cache = None
def start_get_pdu_cache(self):
self._get_pdu_cache = ExpiringCache(
@@ -77,8 +68,6 @@ class FederationClient(FederationBase):
order = self._order
self._order += 1
sent_pdus_destination_dist.inc_by(len(destinations))
logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
# TODO, add errback, etc.
@@ -98,8 +87,6 @@ class FederationClient(FederationBase):
content=content,
)
sent_edus_counter.inc()
# TODO, add errback, etc.
self._transaction_queue.enqueue_edu(edu)
return defer.succeed(None)
@@ -126,8 +113,6 @@ class FederationClient(FederationBase):
a Deferred which will eventually yield a JSON object from the
response
"""
sent_queries_counter.inc(query_type)
return self.transport_layer.make_query(
destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail
)

View File

@@ -22,7 +22,6 @@ from .units import Transaction, Edu
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
from synapse.events import FrozenEvent
import synapse.metrics
from synapse.api.errors import FederationError, SynapseError
@@ -33,15 +32,6 @@ import logging
logger = logging.getLogger(__name__)
# synapse.federation.federation_server is a silly name
metrics = synapse.metrics.get_metrics_for("synapse.federation.server")
received_pdus_counter = metrics.register_counter("received_pdus")
received_edus_counter = metrics.register_counter("received_edus")
received_queries_counter = metrics.register_counter("received_queries", labels=["type"])
class FederationServer(FederationBase):
def set_handler(self, handler):
@@ -94,8 +84,6 @@ class FederationServer(FederationBase):
def on_incoming_transaction(self, transaction_data):
transaction = Transaction(**transaction_data)
received_pdus_counter.inc_by(len(transaction.pdus))
for p in transaction.pdus:
if "unsigned" in p:
unsigned = p["unsigned"]
@@ -165,8 +153,6 @@ class FederationServer(FederationBase):
defer.returnValue((200, response))
def received_edu(self, origin, edu_type, content):
received_edus_counter.inc()
if edu_type in self.edu_handlers:
self.edu_handlers[edu_type](origin, content)
else:
@@ -218,8 +204,6 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_query_request(self, query_type, args):
received_queries_counter.inc(query_type)
if query_type in self.query_handlers:
response = yield self.query_handlers[query_type](args)
defer.returnValue((200, response))

View File

@@ -25,15 +25,12 @@ from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.retryutils import (
get_retry_limiter, NotRetryingDestination,
)
import synapse.metrics
import logging
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
class TransactionQueue(object):
"""This class makes sure we only have one transaction in flight at
@@ -57,25 +54,11 @@ class TransactionQueue(object):
# done
self.pending_transactions = {}
metrics.register_callback(
"pending_destinations",
lambda: len(self.pending_transactions),
)
# Is a mapping from destination -> list of
# tuple(pending pdus, deferred, order)
self.pending_pdus_by_dest = pdus = {}
self.pending_pdus_by_dest = {}
# destination -> list of tuple(edu, deferred)
self.pending_edus_by_dest = edus = {}
metrics.register_callback(
"pending_pdus",
lambda: sum(map(len, pdus.values())),
)
metrics.register_callback(
"pending_edus",
lambda: sum(map(len, edus.values())),
)
self.pending_edus_by_dest = {}
# destination -> list of tuple(failure, deferred)
self.pending_failures_by_dest = {}

View File

@@ -148,10 +148,6 @@ class BaseFederationServlet(object):
logger.exception("authenticate_request failed")
raise
defer.returnValue(response)
# Extra logic that functools.wraps() doesn't finish
new_code.__self__ = code.__self__
return new_code
def register(self, server):

View File

@@ -90,8 +90,8 @@ class BaseHandler(object):
event = builder.build()
logger.debug(
"Created event %s with current state: %s",
event.event_id, context.current_state,
"Created event %s with auth_events: %s, current state: %s",
event.event_id, context.auth_events, context.current_state,
)
defer.returnValue(
@@ -106,7 +106,7 @@ class BaseHandler(object):
# We now need to go and hit out to wherever we need to hit out to.
if not suppress_auth:
self.auth.check(event, auth_events=context.current_state)
self.auth.check(event, auth_events=context.auth_events)
yield self.store.persist_event(event, context=context)
@@ -142,16 +142,7 @@ class BaseHandler(object):
"Failed to get destination from event %s", s.event_id
)
# Don't block waiting on waking up all the listeners.
d = self.notifier.on_new_room_event(event, extra_users=extra_users)
def log_failure(f):
logger.warn(
"Failed to notify about %s: %s",
event.event_id, f.value
)
d.addErrback(log_failure)
yield self.notifier.on_new_room_event(event, extra_users=extra_users)
yield federation_handler.handle_new_event(
event, destinations=destinations,

View File

@@ -73,7 +73,6 @@ class FederationHandler(BaseHandler):
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@log_function
@defer.inlineCallbacks
def handle_new_event(self, event, destinations):
""" Takes in an event from the client to server side, that has already
@@ -290,8 +289,6 @@ class FederationHandler(BaseHandler):
"""
logger.debug("Joining %s to %s", joinee, room_id)
yield self.store.clean_room_for_join(room_id)
origin, pdu = yield self.replication_layer.make_join(
target_hosts,
room_id,
@@ -466,9 +463,11 @@ class FederationHandler(BaseHandler):
builder=builder,
)
self.auth.check(event, auth_events=context.current_state)
self.auth.check(event, auth_events=context.auth_events)
defer.returnValue(event)
pdu = event
defer.returnValue(pdu)
@defer.inlineCallbacks
@log_function
@@ -705,7 +704,7 @@ class FederationHandler(BaseHandler):
)
if not auth_events:
auth_events = context.current_state
auth_events = context.auth_events
logger.debug(
"_handle_new_event: %s, auth_events: %s",

View File

@@ -21,7 +21,6 @@ from synapse.api.constants import PresenceState
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
from synapse.types import UserID
import synapse.metrics
from ._base import BaseHandler
@@ -30,8 +29,6 @@ import logging
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
# TODO(paul): Maybe there's one of these I can steal from somewhere
def partition(l, func):
@@ -136,11 +133,6 @@ class PresenceHandler(BaseHandler):
self._user_cachemap = {}
self._user_cachemap_latest_serial = 0
metrics.register_callback(
"userCachemap:size",
lambda: len(self._user_cachemap),
)
def _get_or_make_usercache(self, user):
"""If the cache entry doesn't exist, initialise a new one."""
if user not in self._user_cachemap:

View File

@@ -31,7 +31,6 @@ import base64
import bcrypt
import json
import logging
import urllib
logger = logging.getLogger(__name__)
@@ -64,13 +63,6 @@ class RegistrationHandler(BaseHandler):
password_hash = bcrypt.hashpw(password, bcrypt.gensalt())
if localpart:
if localpart and urllib.quote(localpart) != localpart:
raise SynapseError(
400,
"User ID must only contain characters which do not"
" require URL encoding."
)
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()

View File

@@ -15,7 +15,6 @@
from synapse.api.errors import CodeMessageException
from syutil.jsonutil import encode_canonical_json
import synapse.metrics
from twisted.internet import defer, reactor
from twisted.web.client import (
@@ -32,17 +31,6 @@ import urllib
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
outgoing_requests_counter = metrics.register_counter(
"requests",
labels=["method"],
)
incoming_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
)
class SimpleHttpClient(object):
"""
@@ -57,30 +45,12 @@ class SimpleHttpClient(object):
self.agent = Agent(reactor)
self.version_string = hs.version_string
def request(self, method, *args, **kwargs):
# A small wrapper around self.agent.request() so we can easily attach
# counters to it
outgoing_requests_counter.inc(method)
d = self.agent.request(method, *args, **kwargs)
def _cb(response):
incoming_responses_counter.inc(method, response.code)
return response
def _eb(failure):
incoming_responses_counter.inc(method, "ERR")
return failure
d.addCallbacks(_cb, _eb)
return d
@defer.inlineCallbacks
def post_urlencoded_get_json(self, uri, args={}):
logger.debug("post_urlencoded_get_json args: %s", args)
query_bytes = urllib.urlencode(args, True)
response = yield self.request(
response = yield self.agent.request(
"POST",
uri.encode("ascii"),
headers=Headers({
@@ -100,7 +70,7 @@ class SimpleHttpClient(object):
logger.info("HTTP POST %s -> %s", json_str, uri)
response = yield self.request(
response = yield self.agent.request(
"POST",
uri.encode("ascii"),
headers=Headers({
@@ -134,7 +104,7 @@ class SimpleHttpClient(object):
query_bytes = urllib.urlencode(args, True)
uri = "%s?%s" % (uri, query_bytes)
response = yield self.request(
response = yield self.agent.request(
"GET",
uri.encode("ascii"),
headers=Headers({
@@ -175,7 +145,7 @@ class SimpleHttpClient(object):
json_str = encode_canonical_json(json_body)
response = yield self.request(
response = yield self.agent.request(
"PUT",
uri.encode("ascii"),
headers=Headers({
@@ -206,7 +176,7 @@ class CaptchaServerHttpClient(SimpleHttpClient):
def post_urlencoded_get_raw(self, url, args={}):
query_bytes = urllib.urlencode(args, True)
response = yield self.request(
response = yield self.agent.request(
"POST",
url.encode("ascii"),
bodyProducer=FileBodyProducer(StringIO(query_bytes)),

View File

@@ -23,7 +23,6 @@ from twisted.web._newclient import ResponseDone
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util.async import sleep
from synapse.util.logcontext import PreserveLoggingContext
import synapse.metrics
from syutil.jsonutil import encode_canonical_json
@@ -41,17 +40,6 @@ import urlparse
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
outgoing_requests_counter = metrics.register_counter(
"requests",
labels=["method"],
)
incoming_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
)
class MatrixFederationHttpAgent(_AgentBase):
@@ -61,8 +49,6 @@ class MatrixFederationHttpAgent(_AgentBase):
def request(self, destination, endpoint, method, path, params, query,
headers, body_producer):
outgoing_requests_counter.inc(method)
host = b""
port = 0
fragment = b""
@@ -73,21 +59,9 @@ class MatrixFederationHttpAgent(_AgentBase):
# Set the connection pool key to be the destination.
key = destination
d = self._requestWithEndpoint(key, endpoint, method, parsed_URI,
headers, body_producer,
parsed_URI.originForm)
def _cb(response):
incoming_responses_counter.inc(method, response.code)
return response
def _eb(failure):
incoming_responses_counter.inc(method, "ERR")
return failure
d.addCallbacks(_cb, _eb)
return d
return self._requestWithEndpoint(key, endpoint, method, parsed_URI,
headers, body_producer,
parsed_URI.originForm)
class MatrixFederationHttpClient(object):

View File

@@ -18,7 +18,6 @@ from synapse.api.errors import (
cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError
)
from synapse.util.logcontext import LoggingContext
import synapse.metrics
from syutil.jsonutil import (
encode_canonical_json, encode_pretty_printed_json
@@ -35,22 +34,6 @@ import urllib
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
incoming_requests_counter = metrics.register_counter(
"requests",
labels=["method", "servlet"],
)
outgoing_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
)
response_timer = metrics.register_distribution(
"response_time",
labels=["method", "servlet"]
)
class HttpServer(object):
""" Interface for registering callbacks on a HTTP server
@@ -91,7 +74,6 @@ class JsonResource(HttpServer, resource.Resource):
self.clock = hs.get_clock()
self.path_regexs = {}
self.version_string = hs.version_string
self.hs = hs
def register_path(self, method, path_pattern, callback):
self.path_regexs.setdefault(method, []).append(
@@ -105,11 +87,7 @@ class JsonResource(HttpServer, resource.Resource):
port (int): The port to listen on.
"""
reactor.listenTCP(
port,
server.Site(self),
interface=self.hs.config.bind_host
)
reactor.listenTCP(port, server.Site(self))
# Gets called by twisted
def render(self, request):
@@ -153,15 +131,6 @@ class JsonResource(HttpServer, resource.Resource):
# returned response. We pass both the request and any
# matched groups from the regex to the callback.
callback = path_entry.callback
servlet_instance = getattr(callback, "__self__", None)
if servlet_instance is not None:
servlet_classname = servlet_instance.__class__.__name__
else:
servlet_classname = "%r" % callback
incoming_requests_counter.inc(request.method, servlet_classname)
args = [
urllib.unquote(u).decode("UTF-8") for u in m.groups()
]
@@ -171,13 +140,12 @@ class JsonResource(HttpServer, resource.Resource):
request.method, request.path
)
code, response = yield callback(request, *args)
self._send_response(request, code, response)
response_timer.inc_by(
self.clock.time_msec() - start, request.method, servlet_classname
code, response = yield path_entry.callback(
request,
*args
)
self._send_response(request, code, response)
return
# Huh. No one wanted to handle that? Fiiiiiine. Send 400.
@@ -222,8 +190,6 @@ class JsonResource(HttpServer, resource.Resource):
request)
return
outgoing_responses_counter.inc(request.method, str(code))
# TODO: Only enable CORS for the requests that need it.
respond_with_json(
request, code, response_json_object,

View File

@@ -51,8 +51,8 @@ class RestServlet(object):
pattern = self.PATTERN
for method in ("GET", "PUT", "POST", "OPTIONS", "DELETE"):
if hasattr(self, "on_%s" % (method,)):
method_handler = getattr(self, "on_%s" % (method,))
if hasattr(self, "on_%s" % (method)):
method_handler = getattr(self, "on_%s" % (method))
http_server.register_path(method, pattern, method_handler)
else:
raise NotImplementedError("RestServlet must register something.")

View File

@@ -1,111 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Because otherwise 'resource' collides with synapse.metrics.resource
from __future__ import absolute_import
import logging
from resource import getrusage, getpagesize, RUSAGE_SELF
from .metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
)
logger = logging.getLogger(__name__)
# We'll keep all the available metrics in a single toplevel dict, one shared
# for the entire process. We don't currently support per-HomeServer instances
# of metrics, because in practice any one python VM will host only one
# HomeServer anyway. This makes a lot of implementation neater
all_metrics = {}
class Metrics(object):
""" A single Metrics object gives a (mutable) slice view of the all_metrics
dict, allowing callers to easily register new metrics that are namespaced
nicely."""
def __init__(self, name):
self.name_prefix = name
def _register(self, metric_class, name, *args, **kwargs):
full_name = "%s_%s" % (self.name_prefix, name)
metric = metric_class(full_name, *args, **kwargs)
all_metrics[full_name] = metric
return metric
def register_counter(self, *args, **kwargs):
return self._register(CounterMetric, *args, **kwargs)
def register_callback(self, *args, **kwargs):
return self._register(CallbackMetric, *args, **kwargs)
def register_distribution(self, *args, **kwargs):
return self._register(DistributionMetric, *args, **kwargs)
def register_cache(self, *args, **kwargs):
return self._register(CacheMetric, *args, **kwargs)
def get_metrics_for(pkg_name):
""" Returns a Metrics instance for conveniently creating metrics
namespaced with the given name prefix. """
# Convert a "package.name" to "package_name" because Prometheus doesn't
# let us use . in metric names
return Metrics(pkg_name.replace(".", "_"))
def render_all():
strs = []
# TODO(paul): Internal hack
update_resource_metrics()
for name in sorted(all_metrics.keys()):
try:
strs += all_metrics[name].render()
except Exception:
strs += ["# FAILED to render %s" % name]
logger.exception("Failed to render %s metric", name)
strs.append("") # to generate a final CRLF
return "\n".join(strs)
# Now register some standard process-wide state metrics, to give indications of
# process resource usage
rusage = None
PAGE_SIZE = getpagesize()
def update_resource_metrics():
global rusage
rusage = getrusage(RUSAGE_SELF)
resource_metrics = get_metrics_for("process.resource")
# msecs
resource_metrics.register_callback("utime", lambda: rusage.ru_utime * 1000)
resource_metrics.register_callback("stime", lambda: rusage.ru_stime * 1000)
# pages
resource_metrics.register_callback("maxrss", lambda: rusage.ru_maxrss * PAGE_SIZE)

View File

@@ -1,155 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from itertools import chain
# TODO(paul): I can't believe Python doesn't have one of these
def map_concat(func, items):
# flatten a list-of-lists
return list(chain.from_iterable(map(func, items)))
class BaseMetric(object):
def __init__(self, name, labels=[]):
self.name = name
self.labels = labels # OK not to clone as we never write it
def dimension(self):
return len(self.labels)
def is_scalar(self):
return not len(self.labels)
def _render_labelvalue(self, value):
# TODO: some kind of value escape
return '"%s"' % (value)
def _render_key(self, values):
if self.is_scalar():
return ""
return "{%s}" % (
",".join(["%s=%s" % (k, self._render_labelvalue(v))
for k, v in zip(self.labels, values)])
)
def render(self):
return map_concat(self.render_item, sorted(self.counts.keys()))
class CounterMetric(BaseMetric):
"""The simplest kind of metric; one that stores a monotonically-increasing
integer that counts events."""
def __init__(self, *args, **kwargs):
super(CounterMetric, self).__init__(*args, **kwargs)
self.counts = {}
# Scalar metrics are never empty
if self.is_scalar():
self.counts[()] = 0
def inc_by(self, incr, *values):
if len(values) != self.dimension():
raise ValueError(
"Expected as many values to inc() as labels (%d)" % (self.dimension())
)
# TODO: should assert that the tag values are all strings
if values not in self.counts:
self.counts[values] = incr
else:
self.counts[values] += incr
def inc(self, *values):
self.inc_by(1, *values)
def render_item(self, k):
return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])]
class CallbackMetric(BaseMetric):
"""A metric that returns the numeric value returned by a callback whenever
it is rendered. Typically this is used to implement gauges that yield the
size or other state of some in-memory object by actively querying it."""
def __init__(self, name, callback, labels=[]):
super(CallbackMetric, self).__init__(name, labels=labels)
self.callback = callback
def render(self):
value = self.callback()
if self.is_scalar():
return ["%s %d" % (self.name, value)]
return ["%s%s %d" % (self.name, self._render_key(k), value[k])
for k in sorted(value.keys())]
class DistributionMetric(object):
"""A combination of an event counter and an accumulator, which counts
both the number of events and accumulates the total value. Typically this
could be used to keep track of method-running times, or other distributions
of values that occur in discrete occurances.
TODO(paul): Try to export some heatmap-style stats?
"""
def __init__(self, name, *args, **kwargs):
self.counts = CounterMetric(name + ":count", **kwargs)
self.totals = CounterMetric(name + ":total", **kwargs)
def inc_by(self, inc, *values):
self.counts.inc(*values)
self.totals.inc_by(inc, *values)
def render(self):
return self.counts.render() + self.totals.render()
class CacheMetric(object):
"""A combination of two CounterMetrics, one to count cache hits and one to
count a total, and a callback metric to yield the current size.
This metric generates standard metric name pairs, so that monitoring rules
can easily be applied to measure hit ratio."""
def __init__(self, name, size_callback, labels=[]):
self.name = name
self.hits = CounterMetric(name + ":hits", labels=labels)
self.total = CounterMetric(name + ":total", labels=labels)
self.size = CallbackMetric(
name + ":size",
callback=size_callback,
labels=labels,
)
def inc_hits(self, *values):
self.hits.inc(*values)
self.total.inc(*values)
def inc_misses(self, *values):
self.total.inc(*values)
def render(self):
return self.hits.render() + self.total.render() + self.size.render()

View File

@@ -1,39 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.web.resource import Resource
import synapse.metrics
METRICS_PREFIX = "/_synapse/metrics"
class MetricsResource(Resource):
isLeaf = True
def __init__(self, hs):
Resource.__init__(self) # Resource is old-style, so no super()
self.hs = hs
def render_GET(self, request):
response = synapse.metrics.render_all()
request.setHeader("Content-Type", "text/plain")
request.setHeader("Content-Length", str(len(response)))
# Encode as UTF-8 (default)
return response.encode()

View File

@@ -19,27 +19,12 @@ from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.async import run_on_reactor
from synapse.types import StreamToken
import synapse.metrics
import logging
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
notified_events_counter = metrics.register_counter("notified_events")
# TODO(paul): Should be shared somewhere
def count(func, l):
"""Return the number of items in l for which func returns true."""
n = 0
for x in l:
if func(x):
n += 1
return n
class _NotificationListener(object):
""" This represents a single client connection to the events stream.
@@ -74,7 +59,6 @@ class _NotificationListener(object):
try:
self.deferred.callback(result)
notified_events_counter.inc_by(len(events))
except defer.AlreadyCalledError:
pass
@@ -111,35 +95,6 @@ class Notifier(object):
"user_joined_room", self._user_joined_room
)
# This is not a very cheap test to perform, but it's only executed
# when rendering the metrics page, which is likely once per minute at
# most when scraping it.
def count_listeners():
all_listeners = set()
for x in self.room_to_listeners.values():
all_listeners |= x
for x in self.user_to_listeners.values():
all_listeners |= x
for x in self.appservice_to_listeners.values():
all_listeners |= x
return len(all_listeners)
metrics.register_callback("listeners", count_listeners)
metrics.register_callback(
"rooms",
lambda: count(bool, self.room_to_listeners.values()),
)
metrics.register_callback(
"users",
lambda: count(bool, self.user_to_listeners.values()),
)
metrics.register_callback(
"appservices",
lambda: count(bool, self.appservice_to_listeners.values()),
)
@log_function
@defer.inlineCallbacks
def on_new_room_event(self, event, extra_users=[]):

View File

@@ -32,7 +32,7 @@ class Pusher(object):
INITIAL_BACKOFF = 1000
MAX_BACKOFF = 60 * 60 * 1000
GIVE_UP_AFTER = 24 * 60 * 60 * 1000
DEFAULT_ACTIONS = ['dont_notify']
DEFAULT_ACTIONS = ['dont-notify']
INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
@@ -105,11 +105,7 @@ class Pusher(object):
room_member_count += 1
for r in rules:
if r['rule_id'] in enabled_map:
r['enabled'] = enabled_map[r['rule_id']]
elif 'enabled' not in r:
r['enabled'] = True
if not r['enabled']:
if r['rule_id'] in enabled_map and not enabled_map[r['rule_id']]:
continue
matches = True
@@ -128,21 +124,13 @@ class Pusher(object):
# ignore rules with no actions (we have an explict 'dont_notify')
if len(actions) == 0:
logger.warn(
"Ignoring rule id %s with no actions for user %s",
r['rule_id'], self.user_name
"Ignoring rule id %s with no actions for user %s" %
(r['rule_id'], r['user_name'])
)
continue
if matches:
logger.info(
"%s matches for user %s, event %s",
r['rule_id'], self.user_name, ev['event_id']
)
defer.returnValue(actions)
logger.info(
"No rules match for user %s, event %s",
self.user_name, ev['event_id']
)
defer.returnValue(Pusher.DEFAULT_ACTIONS)
@staticmethod

View File

@@ -6,51 +6,36 @@ def list_with_base_rules(rawrules, user_name):
# shove the server default rules for each kind onto the end of each
current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1]
ruleslist.extend(make_base_prepend_rules(
user_name, PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
))
for r in rawrules:
if r['priority_class'] < current_prio_class:
while r['priority_class'] < current_prio_class:
ruleslist.extend(make_base_append_rules(
ruleslist.extend(make_base_rules(
user_name,
PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
))
current_prio_class -= 1
if current_prio_class > 0:
ruleslist.extend(make_base_prepend_rules(
user_name,
PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
))
ruleslist.append(r)
while current_prio_class > 0:
ruleslist.extend(make_base_append_rules(
ruleslist.extend(make_base_rules(
user_name,
PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
))
current_prio_class -= 1
if current_prio_class > 0:
ruleslist.extend(make_base_prepend_rules(
user_name,
PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
))
return ruleslist
def make_base_append_rules(user, kind):
def make_base_rules(user, kind):
rules = []
if kind == 'override':
rules = make_base_append_override_rules()
rules = make_base_override_rules()
elif kind == 'underride':
rules = make_base_append_underride_rules(user)
rules = make_base_underride_rules(user)
elif kind == 'content':
rules = make_base_append_content_rules(user)
rules = make_base_content_rules(user)
for r in rules:
r['priority_class'] = PRIORITY_CLASS_MAP[kind]
@@ -59,20 +44,7 @@ def make_base_append_rules(user, kind):
return rules
def make_base_prepend_rules(user, kind):
rules = []
if kind == 'override':
rules = make_base_prepend_override_rules()
for r in rules:
r['priority_class'] = PRIORITY_CLASS_MAP[kind]
r['default'] = True # Deprecated, left for backwards compat
return rules
def make_base_append_content_rules(user):
def make_base_content_rules(user):
return [
{
'rule_id': 'global/content/.m.rule.contains_user_name',
@@ -96,20 +68,7 @@ def make_base_append_content_rules(user):
]
def make_base_prepend_override_rules():
return [
{
'rule_id': 'global/override/.m.rule.master',
'enabled': False,
'conditions': [],
'actions': [
"dont_notify"
]
}
]
def make_base_append_override_rules():
def make_base_override_rules():
return [
{
'rule_id': 'global/override/.m.rule.call',
@@ -127,7 +86,7 @@ def make_base_append_override_rules():
'value': 'ring'
}, {
'set_tweak': 'highlight',
'value': False
'value': 'false'
}
]
},
@@ -176,14 +135,14 @@ def make_base_append_override_rules():
'value': 'default'
}, {
'set_tweak': 'highlight',
'value': False
'value': 'false'
}
]
}
]
def make_base_append_underride_rules(user):
def make_base_underride_rules(user):
return [
{
'rule_id': 'global/underride/.m.rule.invite_for_me',
@@ -211,7 +170,7 @@ def make_base_append_underride_rules(user):
'value': 'default'
}, {
'set_tweak': 'highlight',
'value': False
'value': 'false'
}
]
},
@@ -227,7 +186,7 @@ def make_base_append_underride_rules(user):
'actions': [
'notify', {
'set_tweak': 'highlight',
'value': False
'value': 'false'
}
]
},
@@ -243,7 +202,7 @@ def make_base_append_underride_rules(user):
'actions': [
'notify', {
'set_tweak': 'highlight',
'value': False
'value': 'false'
}
]
}

View File

@@ -5,6 +5,7 @@ logger = logging.getLogger(__name__)
REQUIREMENTS = {
"syutil>=0.0.3": ["syutil"],
"matrix_angular_sdk>=0.6.4": ["syweb>=0.6.4"],
"Twisted==14.0.2": ["twisted==14.0.2"],
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
"pyopenssl>=0.14": ["OpenSSL>=0.14"],
@@ -17,19 +18,6 @@ REQUIREMENTS = {
"pillow": ["PIL"],
"pydenticon": ["pydenticon"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {
"matrix_angular_sdk>=0.6.5": ["syweb>=0.6.5"],
}
}
def requirements(config=None, include_conditional=False):
reqs = REQUIREMENTS.copy()
for key, req in CONDITIONAL_REQUIREMENTS.items():
if (config and getattr(config, key)) or include_conditional:
reqs.update(req)
return reqs
def github_link(project, version, egg):
@@ -48,8 +36,8 @@ DEPENDENCY_LINKS = [
),
github_link(
project="matrix-org/matrix-angular-sdk",
version="v0.6.5",
egg="matrix_angular_sdk-0.6.5",
version="v0.6.4",
egg="matrix_angular_sdk-0.6.4",
),
]
@@ -58,11 +46,10 @@ class MissingRequirementError(Exception):
pass
def check_requirements(config=None):
def check_requirements():
"""Checks that all the modules needed by synapse have been correctly
installed and are at the correct version"""
for dependency, module_requirements in (
requirements(config, include_conditional=False).items()):
for dependency, module_requirements in REQUIREMENTS.items():
for module_requirement in module_requirements:
if ">=" in module_requirement:
module_name, required_version = module_requirement.split(">=")
@@ -123,7 +110,7 @@ def list_requirements():
egg = link.split("#egg=")[1]
linked.append(egg.split('-')[0])
result.append(link)
for requirement in requirements(include_conditional=True):
for requirement in REQUIREMENTS:
is_linked = False
for link in linked:
if requirement.replace('-', '_').startswith(link):

View File

@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensensed under the Apache License, Version 2.0 (the "License");
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
@@ -89,8 +89,7 @@ def _parse_json(request):
if type(content) != dict:
raise SynapseError(400, "Content must be a JSON object.")
return content
except ValueError as e:
logger.warn(e)
except ValueError:
raise SynapseError(400, "Content not JSON.")

View File

@@ -156,12 +156,9 @@ class PushRuleRestServlet(ClientV1RestServlet):
template_rule = _rule_to_template(r)
if template_rule:
template_rule['enabled'] = True
if r['rule_id'] in enabled_map:
template_rule['enabled'] = enabled_map[r['rule_id']]
elif 'enabled' in r:
template_rule['enabled'] = r['enabled']
else:
template_rule['enabled'] = True
rulearray.append(template_rule)
path = request.postpath[1:]

View File

@@ -27,6 +27,7 @@ from hashlib import sha1
import hmac
import simplejson as json
import logging
import urllib
logger = logging.getLogger(__name__)
@@ -109,22 +110,14 @@ class RegisterRestServlet(ClientV1RestServlet):
login_type = register_json["type"]
is_application_server = login_type == LoginType.APPLICATION_SERVICE
is_using_shared_secret = login_type == LoginType.SHARED_SECRET
can_register = (
not self.disable_registration
or is_application_server
or is_using_shared_secret
)
if not can_register:
if self.disable_registration and not is_application_server:
raise SynapseError(403, "Registration has been disabled")
stages = {
LoginType.RECAPTCHA: self._do_recaptcha,
LoginType.PASSWORD: self._do_password,
LoginType.EMAIL_IDENTITY: self._do_email_identity,
LoginType.APPLICATION_SERVICE: self._do_app_service,
LoginType.SHARED_SECRET: self._do_shared_secret,
LoginType.APPLICATION_SERVICE: self._do_app_service
}
session_info = self._get_session_info(request, session)
@@ -262,11 +255,14 @@ class RegisterRestServlet(ClientV1RestServlet):
)
password = register_json["password"].encode("utf-8")
desired_user_id = (
register_json["user"].encode("utf-8")
if "user" in register_json else None
)
desired_user_id = (register_json["user"].encode("utf-8")
if "user" in register_json else None)
if (desired_user_id
and urllib.quote(desired_user_id) != desired_user_id):
raise SynapseError(
400,
"User ID must only contain characters which do not " +
"require URL encoding.")
handler = self.handlers.registration_handler
(user_id, token) = yield handler.register(
localpart=desired_user_id,
@@ -308,51 +304,6 @@ class RegisterRestServlet(ClientV1RestServlet):
"home_server": self.hs.hostname,
})
@defer.inlineCallbacks
def _do_shared_secret(self, request, register_json, session):
yield run_on_reactor()
if not isinstance(register_json.get("mac", None), basestring):
raise SynapseError(400, "Expected mac.")
if not isinstance(register_json.get("user", None), basestring):
raise SynapseError(400, "Expected 'user' key.")
if not isinstance(register_json.get("password", None), basestring):
raise SynapseError(400, "Expected 'password' key.")
if not self.hs.config.registration_shared_secret:
raise SynapseError(400, "Shared secret registration is not enabled")
user = register_json["user"].encode("utf-8")
# str() because otherwise hmac complains that 'unicode' does not
# have the buffer interface
got_mac = str(register_json["mac"])
want_mac = hmac.new(
key=self.hs.config.registration_shared_secret,
msg=user,
digestmod=sha1,
).hexdigest()
password = register_json["password"].encode("utf-8")
if compare_digest(want_mac, got_mac):
handler = self.handlers.registration_handler
user_id, token = yield handler.register(
localpart=user,
password=password,
)
self._remove_session(session)
defer.returnValue({
"user_id": user_id,
"access_token": token,
"home_server": self.hs.hostname,
})
else:
raise SynapseError(
403, "HMAC incorrect",
)
def _parse_json(request):
try:

View File

@@ -165,6 +165,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
defer.returnValue((200, {}))
def trace(f):
f.should_trace = True
f.root_trace = True
return f
# TODO: Needs unit testing for generic events + feedback
class RoomSendEventRestServlet(ClientV1RestServlet):
@@ -175,7 +181,11 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_POST(self, request, room_id, event_type, txn_id=None):
import inspect
frame = inspect.currentframe()
logger.info("Frame: %s", id(frame))
user, client = yield self.auth.get_user_by_req(request)
logger.info("Frame: %s", id(inspect.currentframe()))
content = _parse_json(request)
msg_handler = self.handlers.message_handler
@@ -189,12 +199,14 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
client=client,
txn_id=txn_id,
)
logger.info("Frame: %s", id(inspect.currentframe()))
defer.returnValue((200, {"event_id": event.event_id}))
def on_GET(self, request, room_id, event_type, txn_id):
return (200, "Not implemented")
@trace
@defer.inlineCallbacks
def on_PUT(self, request, room_id, event_type, txn_id):
try:

View File

@@ -56,7 +56,6 @@ class BaseHomeServer(object):
"""
DEPENDENCIES = [
'config',
'clock',
'http_client',
'db_name',
@@ -80,7 +79,6 @@ class BaseHomeServer(object):
'resource_for_server_key',
'resource_for_media_repository',
'resource_for_app_services',
'resource_for_metrics',
'event_sources',
'ratelimiter',
'keyring',

View File

@@ -21,7 +21,6 @@ from synapse.util.async import run_on_reactor
from synapse.util.expiringcache import ExpiringCache
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
from synapse.api.auth import AuthEventTypes
from synapse.events.snapshot import EventContext
from collections import namedtuple
@@ -39,6 +38,12 @@ def _get_state_key_from_event(event):
KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key"))
AuthEventTypes = (
EventTypes.Create, EventTypes.Member, EventTypes.PowerLevels,
EventTypes.JoinRules,
)
SIZE_OF_CACHE = 1000
EVICTION_TIMEOUT_SECONDS = 20
@@ -134,6 +139,18 @@ class StateHandler(object):
}
context.state_group = None
if hasattr(event, "auth_events") and event.auth_events:
auth_ids = self.hs.get_auth().compute_auth_events(
event, context.current_state
)
context.auth_events = {
k: v
for k, v in context.current_state.items()
if v.event_id in auth_ids
}
else:
context.auth_events = {}
if event.is_state():
key = (event.type, event.state_key)
if key in context.current_state:
@@ -170,6 +187,18 @@ class StateHandler(object):
replaces = context.current_state[key]
event.unsigned["replaces_state"] = replaces.event_id
if hasattr(event, "auth_events") and event.auth_events:
auth_ids = self.hs.get_auth().compute_auth_events(
event, context.current_state
)
context.auth_events = {
k: v
for k, v in context.current_state.items()
if v.event_id in auth_ids
}
else:
context.auth_events = {}
context.prev_state_events = prev_state
defer.returnValue(context)

View File

@@ -450,7 +450,7 @@ class DataStore(RoomMemberStore, RoomStore,
else:
args = (room_id, )
results = yield self._execute_and_decode("get_current_state", sql, *args)
results = yield self._execute_and_decode(sql, *args)
events = yield self._parse_events(results)
defer.returnValue(events)
@@ -475,7 +475,7 @@ class DataStore(RoomMemberStore, RoomStore,
sql += " OR s.type = 'm.room.aliases')"
args = (room_id,)
results = yield self._execute_and_decode("get_current_state", sql, *args)
results = yield self._execute_and_decode(sql, *args)
events = yield self._parse_events(results)
@@ -484,18 +484,17 @@ class DataStore(RoomMemberStore, RoomStore,
for e in events:
if e.type == 'm.room.name':
if 'name' in e.content:
name = e.content['name']
name = e.content['name']
elif e.type == 'm.room.aliases':
if 'aliases' in e.content:
aliases.extend(e.content['aliases'])
aliases.extend(e.content['aliases'])
defer.returnValue((name, aliases))
@defer.inlineCallbacks
def _get_min_token(self):
row = yield self._execute(
"_get_min_token", None, "SELECT MIN(stream_ordering) FROM events"
None,
"SELECT MIN(stream_ordering) FROM events"
)
self.min_token = row[0][0] if row and row[0] and row[0][0] else -1

View File

@@ -20,12 +20,10 @@ from synapse.events.utils import prune_event
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
from synapse.util.lrucache import LruCache
import synapse.metrics
from twisted.internet import defer
from collections import namedtuple, OrderedDict
import functools
import simplejson as json
import sys
import time
@@ -37,24 +35,9 @@ sql_logger = logging.getLogger("synapse.storage.SQL")
transaction_logger = logging.getLogger("synapse.storage.txn")
metrics = synapse.metrics.get_metrics_for("synapse.storage")
sql_scheduling_timer = metrics.register_distribution("schedule_time")
sql_query_timer = metrics.register_distribution("query_time", labels=["verb"])
sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"])
sql_getevents_timer = metrics.register_distribution("getEvents_time", labels=["desc"])
caches_by_name = {}
cache_counter = metrics.register_cache(
"cache",
lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
labels=["name"],
)
# TODO(paul):
# * more generic key management
# * export monitoring stats
# * consider other eviction strategies - LRU?
def cached(max_entries=1000):
""" A method decorator that applies a memoizing cache around the function.
@@ -72,9 +55,6 @@ def cached(max_entries=1000):
"""
def wrap(orig):
cache = OrderedDict()
name = orig.__name__
caches_by_name[name] = cache
def prefill(key, value):
while len(cache) > max_entries:
@@ -82,14 +62,11 @@ def cached(max_entries=1000):
cache[key] = value
@functools.wraps(orig)
@defer.inlineCallbacks
def wrapped(self, key):
if key in cache:
cache_counter.inc_hits(name)
defer.returnValue(cache[key])
cache_counter.inc_misses(name)
ret = yield orig(self, key)
prefill(key, ret)
defer.returnValue(ret)
@@ -106,8 +83,7 @@ def cached(max_entries=1000):
class LoggingTransaction(object):
"""An object that almost-transparently proxies for the 'txn' object
passed to the constructor. Adds logging and metrics to the .execute()
method."""
passed to the constructor. Adds logging to the .execute() method."""
__slots__ = ["txn", "name"]
def __init__(self, txn, name):
@@ -123,7 +99,6 @@ class LoggingTransaction(object):
def execute(self, sql, *args, **kwargs):
# TODO(paul): Maybe use 'info' and 'debug' for values?
sql_logger.debug("[SQL] {%s} %s", self.name, sql)
try:
if args and args[0]:
values = args[0]
@@ -145,9 +120,8 @@ class LoggingTransaction(object):
logger.exception("[SQL FAIL] {%s}", self.name)
raise
finally:
msecs = (time.time() * 1000) - start
sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
sql_query_timer.inc_by(msecs, sql.split()[0])
end = time.time() * 1000
sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
class PerformanceCounters(object):
@@ -198,18 +172,11 @@ class SQLBaseStore(object):
self._previous_txn_total_time = 0
self._current_txn_total_time = 0
self._previous_loop_ts = 0
# TODO(paul): These can eventually be removed once the metrics code
# is running in mainline, and we have some nice monitoring frontends
# to watch it
self._txn_perf_counters = PerformanceCounters()
self._get_event_counters = PerformanceCounters()
self._get_event_cache = LruCache(hs.config.event_cache_size)
# Pretend the getEventCache is just another named cache
caches_by_name["*getEvent*"] = self._get_event_cache
def start_profiling(self):
self._previous_loop_ts = self._clock.time_msec()
@@ -244,8 +211,6 @@ class SQLBaseStore(object):
"""Wraps the .runInteraction() method on the underlying db_pool."""
current_context = LoggingContext.current_context()
start_time = time.time() * 1000
def inner_func(txn, *args, **kwargs):
with LoggingContext("runInteraction") as context:
current_context.copy_to(context)
@@ -258,7 +223,6 @@ class SQLBaseStore(object):
name = "%s-%x" % (desc, txn_id, )
sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
transaction_logger.debug("[TXN START] {%s}", name)
try:
return func(LoggingTransaction(txn, name), *args, **kwargs)
@@ -267,13 +231,13 @@ class SQLBaseStore(object):
raise
finally:
end = time.time() * 1000
duration = end - start
transaction_logger.debug(
"[TXN END] {%s} %f",
name, end - start
)
transaction_logger.debug("[TXN END] {%s} %f", name, duration)
self._current_txn_total_time += duration
self._current_txn_total_time += end - start
self._txn_perf_counters.update(desc, start, end)
sql_txn_timer.inc_by(duration, desc)
with PreserveLoggingContext():
result = yield self._db_pool.runInteraction(
@@ -295,7 +259,7 @@ class SQLBaseStore(object):
)
return results
def _execute(self, desc, decoder, query, *args):
def _execute(self, decoder, query, *args):
"""Runs a single query for a result set.
Args:
@@ -313,10 +277,10 @@ class SQLBaseStore(object):
else:
return cursor.fetchall()
return self.runInteraction(desc, interaction)
return self.runInteraction("_execute", interaction)
def _execute_and_decode(self, desc, query, *args):
return self._execute(desc, self.cursor_to_dict, query, *args)
def _execute_and_decode(self, query, *args):
return self._execute(self.cursor_to_dict, query, *args)
# "Simple" SQL API methods that operate on a single table with no JOINs,
# no complex WHERE clauses, just a dict of values for columns.
@@ -674,22 +638,14 @@ class SQLBaseStore(object):
get_prev_content=False, allow_rejected=False):
start_time = time.time() * 1000
def update_counter(desc, last_time):
curr_time = self._get_event_counters.update(desc, last_time)
sql_getevents_timer.inc_by(curr_time - last_time, desc)
return curr_time
update_counter = self._get_event_counters.update
cache = self._get_event_cache.setdefault(event_id, {})
try:
# Separate cache entries for each way to invoke _get_event_txn
ret = cache[(check_redacted, get_prev_content, allow_rejected)]
cache_counter.inc_hits("*getEvent*")
return ret
return cache[(check_redacted, get_prev_content, allow_rejected)]
except KeyError:
cache_counter.inc_misses("*getEvent*")
pass
finally:
start_time = update_counter("event_cache", start_time)
@@ -729,11 +685,7 @@ class SQLBaseStore(object):
check_redacted=True, get_prev_content=False):
start_time = time.time() * 1000
def update_counter(desc, last_time):
curr_time = self._get_event_counters.update(desc, last_time)
sql_getevents_timer.inc_by(curr_time - last_time, desc)
return curr_time
update_counter = self._get_event_counters.update
d = json.loads(js)
start_time = update_counter("decode_json", start_time)

View File

@@ -296,7 +296,7 @@ class ApplicationServiceStore(SQLBaseStore):
# }
# ]
services = {}
results = yield self._execute_and_decode("_populate_cache", sql)
results = yield self._execute_and_decode(sql)
for res in results:
as_token = res["token"]
if as_token not in services:

View File

@@ -429,15 +429,3 @@ class EventFederationStore(SQLBaseStore):
)
return events[:limit]
def clean_room_for_join(self, room_id):
return self.runInteraction(
"clean_room_for_join",
self._clean_room_for_join_txn,
room_id,
)
def _clean_room_for_join_txn(self, txn, room_id):
query = "DELETE FROM event_forward_extremities WHERE room_id = ?"
txn.execute(query, (room_id,))

View File

@@ -37,7 +37,7 @@ class FeedbackStore(SQLBaseStore):
"WHERE feedback.target_event_id = ? "
)
rows = yield self._execute_and_decode("get_feedback_for_event", sql, event_id)
rows = yield self._execute_and_decode(sql, event_id)
defer.returnValue(
[

View File

@@ -85,9 +85,7 @@ class KeyStore(SQLBaseStore):
" AND key_id in (" + ",".join("?" for key_id in key_ids) + ")"
)
rows = yield self._execute_and_decode(
"get_server_verify_keys", sql, server_name, *key_ids
)
rows = yield self._execute_and_decode(sql, server_name, *key_ids)
keys = []
for row in rows:

View File

@@ -34,7 +34,7 @@ class PushRuleStore(SQLBaseStore):
"WHERE user_name = ? "
"ORDER BY priority_class DESC, priority DESC"
)
rows = yield self._execute("get_push_rules_for_user", None, sql, user_name)
rows = yield self._execute(None, sql, user_name)
dicts = []
for r in rows:
@@ -56,6 +56,17 @@ class PushRuleStore(SQLBaseStore):
{r['rule_id']: False if r['enabled'] == 0 else True for r in results}
)
@defer.inlineCallbacks
def get_push_rule_enabled_by_user_rule_id(self, user_name, rule_id):
results = yield self._simple_select_list(
PushRuleEnableTable.table_name,
{'user_name': user_name, 'rule_id': rule_id},
['enabled']
)
if not results:
defer.returnValue(True)
defer.returnValue(results[0])
@defer.inlineCallbacks
def add_push_rule(self, before, after, **kwargs):
vals = copy.copy(kwargs)
@@ -206,11 +217,17 @@ class PushRuleStore(SQLBaseStore):
@defer.inlineCallbacks
def set_push_rule_enabled(self, user_name, rule_id, enabled):
yield self._simple_upsert(
PushRuleEnableTable.table_name,
{'user_name': user_name, 'rule_id': rule_id},
{'enabled': enabled}
)
if enabled:
yield self._simple_delete_one(
PushRuleEnableTable.table_name,
{'user_name': user_name, 'rule_id': rule_id}
)
else:
yield self._simple_upsert(
PushRuleEnableTable.table_name,
{'user_name': user_name, 'rule_id': rule_id},
{'enabled': False}
)
class RuleNotFoundException(Exception):

View File

@@ -37,8 +37,7 @@ class PusherStore(SQLBaseStore):
)
rows = yield self._execute(
"get_pushers_by_app_id_and_pushkey", None, sql,
app_id_and_pushkey[0], app_id_and_pushkey[1]
None, sql, app_id_and_pushkey[0], app_id_and_pushkey[1]
)
ret = [
@@ -71,7 +70,7 @@ class PusherStore(SQLBaseStore):
"FROM pushers"
)
rows = yield self._execute("get_all_pushers", None, sql)
rows = yield self._execute(None, sql)
ret = [
{

View File

@@ -19,7 +19,7 @@ from sqlite3 import IntegrityError
from synapse.api.errors import StoreError, Codes
from ._base import SQLBaseStore, cached
from ._base import SQLBaseStore
class RegistrationStore(SQLBaseStore):
@@ -88,14 +88,10 @@ class RegistrationStore(SQLBaseStore):
query = ("SELECT users.name, users.password_hash FROM users"
" WHERE users.name = ?")
return self._execute(
"get_user_by_id", self.cursor_to_dict, query, user_id
self.cursor_to_dict,
query, user_id
)
@cached()
# TODO(paul): Currently there's no code to invalidate this cache. That
# means if/when we ever add internal ways to invalidate access tokens or
# change whether a user is a server admin, those will need to invoke
# store.get_user_by_token.invalidate(token)
def get_user_by_token(self, token):
"""Get a user from the given access token.

View File

@@ -68,7 +68,7 @@ class RoomStore(SQLBaseStore):
"""
query = RoomsTable.select_statement("room_id=?")
return self._execute(
"get_room", RoomsTable.decode_single_result, query, room_id,
RoomsTable.decode_single_result, query, room_id,
)
@defer.inlineCallbacks

View File

@@ -82,7 +82,7 @@ class StateStore(SQLBaseStore):
if context.current_state is None:
return
state_events = dict(context.current_state)
state_events = context.current_state
if event.is_state():
state_events[(event.type, event.state_key)] = event

View File

@@ -16,6 +16,7 @@
class LruCache(object):
"""Least-recently-used cache."""
# TODO(mjark) Add hit/miss counters
# TODO(mjark) Add mutex for linked list for thread safety.
def __init__(self, max_size):
cache = {}

View File

@@ -16,10 +16,6 @@
import random
import string
_string_with_symbols = (
string.digits + string.ascii_letters + ".,;:^&*-_+=#~@"
)
def origin_from_ucid(ucid):
return ucid.split("@", 1)[1]
@@ -27,9 +23,3 @@ def origin_from_ucid(ucid):
def random_string(length):
return ''.join(random.choice(string.ascii_letters) for _ in xrange(length))
def random_string_with_symbols(length):
return ''.join(
random.choice(_string_with_symbols) for _ in xrange(length)
)

286
synapse/util/traceutil.py Normal file
View File

@@ -0,0 +1,286 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import inspect
import logging
logger = logging.getLogger("Tracer")
class Tracer(object):
def __init__(self):
self.interested_deferreds = set()
self.next_id = 1
self.deferred_frames = {}
self.deferred_to_current_frames = {}
def process(self, frame, event, arg):
if event == 'call':
return self.process_call(frame)
def handle_inline_callbacks(self, frm):
argvalues = inspect.getargvalues(frm)
generator = argvalues.locals["g"]
deferred = argvalues.locals["deferred"]
if not hasattr(deferred, "syn_trace_defer_id"):
trace_id = self.get_next_id()
deferred.syn_trace_defer_id = trace_id
logger.info(
"%s named %s",
trace_id,
self.get_name_for_frame(generator.gi_frame)
)
logger.info(
"%s is deferred",
trace_id,
)
logger.info("%s start %d", trace_id, int(time.time() * 1000))
def do(res):
logger.info("%s end %d", trace_id, int(time.time() * 1000))
return res
deferred.addBoth(do)
back = frm.f_back
while back:
try:
name = self.get_name_for_frame(back)
if name == "twisted.internet.defer._inlineCallbacks":
argvalues = inspect.getargvalues(back)
deferred = argvalues.locals["deferred"]
d_id = getattr(deferred, "syn_trace_defer_id", None)
if d_id:
logger.info("%s in %s", trace_id, d_id)
curr_stack = self.deferred_to_current_frames.setdefault(
d_id, []
)
if curr_stack:
logger.info("%s calls %s", curr_stack[-1], trace_id)
else:
logger.info("%s calls %s", d_id, trace_id)
break
except:
pass
back = back.f_back
def are_interested(self, name):
if not name.startswith("synapse"):
return False
if name.startswith("synapse.util.logcontext"):
return False
if name.startswith("synapse.util.logutils"):
return False
if name.startswith("synapse.util.traceutil"):
return False
if name.startswith("synapse.events.FrozenEvent.get"):
return False
if name.startswith("synapse.events.EventBuilder.get"):
return False
if name.startswith("synapse.types"):
return False
if name.startswith("synapse.util.frozenutils.freeze"):
return False
if name.startswith("synapse.util.frozenutils.<dictcomp>"):
return False
if name.startswith("synapse.util.Clock"):
return False
if name.endswith("__repr__") or name.endswith("__str__"):
return False
if name.endswith("<genexpr>"):
return False
return True
def process_call(self, frame):
should_trace = False
try:
name = self.get_name_for_frame(frame)
if name == "twisted.internet.defer._inlineCallbacks":
self.handle_inline_callbacks(frame)
return
if not self.are_interested(name):
return
back_name = self.get_name_for_frame(frame.f_back)
if name == "synapse.api.auth.Auth.get_user_by_req":
logger.info(
"synapse.api.auth.Auth.get_user_by_req %s",
back_name
)
try:
if back_name == "twisted.internet.defer._inlineCallbacks":
def ret(f, event, result):
if event != "return":
return
argvalues = inspect.getargvalues(frame.f_back)
deferred = argvalues.locals["deferred"]
try:
logger.info(
"%s waits on %s",
deferred.syn_trace_defer_id,
result.syn_trace_defer_id
)
except:
pass
return ret
if back_name == "twisted.internet.defer.unwindGenerator":
return
except:
pass
try:
func = getattr(frame.f_locals["self"], frame.f_code.co_name)
if inspect.isgeneratorfunction(func):
return
except:
pass
try:
func = frame.f_globals[frame.f_code.co_name]
if inspect.isgeneratorfunction(func):
return
except:
pass
except:
return
back = frame
names = []
seen_deferreds = []
bottom_deferred = None
while back:
try:
name = self.get_name_for_frame(back)
if name.startswith("synapse"):
names.append(name)
# if name.startswith("twisted.internet.defer"):
# logger.info("Name: %s", name)
if name == "twisted.internet.defer._inlineCallbacks":
argvalues = inspect.getargvalues(back)
deferred = argvalues.locals["deferred"]
d_id = getattr(deferred, "syn_trace_defer_id", None)
if d_id:
seen_deferreds.append(d_id)
if not bottom_deferred:
bottom_deferred = deferred
if d_id in self.interested_deferreds:
should_trace = True
break
func = getattr(back.f_locals["self"], back.f_code.co_name)
if hasattr(func, "should_trace") or hasattr(func.im_func, "should_trace"):
should_trace = True
break
func.root_trace
should_trace = True
break
except:
pass
back = back.f_back
if not should_trace:
return
frame_id = self.get_next_id()
name = self.get_name_for_frame(frame)
logger.info("%s named %s", frame_id, name)
self.interested_deferreds.update(seen_deferreds)
names.reverse()
if bottom_deferred:
self.deferred_frames.setdefault(
bottom_deferred.syn_trace_defer_id, []
).append(names)
logger.info("%s in %s", frame_id, bottom_deferred.syn_trace_defer_id)
if not hasattr(bottom_deferred, "syn_trace_registered_cb"):
bottom_deferred.syn_trace_registered_cb = True
def do(res):
return res
bottom_deferred.addBoth(do)
curr_stack = self.deferred_to_current_frames.setdefault(
bottom_deferred.syn_trace_defer_id, []
)
if curr_stack:
logger.info("%s calls %s", curr_stack[-1], frame_id)
else:
logger.info("%s calls %s", bottom_deferred.syn_trace_defer_id, frame_id)
curr_stack.append(frame_id)
logger.info("%s start %d", frame_id, int(time.time() * 1000))
def p(frame, event, arg):
if event == "return":
curr_stack.pop()
logger.info("%s end %d", frame_id, int(time.time() * 1000))
return p
def get_name_for_frame(self, frame):
module_name = frame.f_globals["__name__"]
cls_instance = frame.f_locals.get("self", None)
if cls_instance:
cls_name = cls_instance.__class__.__name__
name = "%s.%s.%s" % (
module_name, cls_name, frame.f_code.co_name
)
else:
name = "%s.%s" % (
module_name, frame.f_code.co_name
)
return name
def get_next_id(self):
i = self.next_id
self.next_id += 1
return i

View File

@@ -1,161 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tests import unittest
from synapse.metrics.metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
)
class CounterMetricTestCase(unittest.TestCase):
def test_scalar(self):
counter = CounterMetric("scalar")
self.assertEquals(counter.render(), [
'scalar 0',
])
counter.inc()
self.assertEquals(counter.render(), [
'scalar 1',
])
counter.inc_by(2)
self.assertEquals(counter.render(), [
'scalar 3'
])
def test_vector(self):
counter = CounterMetric("vector", labels=["method"])
# Empty counter doesn't yet know what values it has
self.assertEquals(counter.render(), [])
counter.inc("GET")
self.assertEquals(counter.render(), [
'vector{method="GET"} 1',
])
counter.inc("GET")
counter.inc("PUT")
self.assertEquals(counter.render(), [
'vector{method="GET"} 2',
'vector{method="PUT"} 1',
])
class CallbackMetricTestCase(unittest.TestCase):
def test_scalar(self):
d = dict()
metric = CallbackMetric("size", lambda: len(d))
self.assertEquals(metric.render(), [
'size 0',
])
d["key"] = "value"
self.assertEquals(metric.render(), [
'size 1',
])
def test_vector(self):
vals = dict()
metric = CallbackMetric("values", lambda: vals, labels=["type"])
self.assertEquals(metric.render(), [])
# Keys have to be tuples, even if they're 1-element
vals[("foo",)] = 1
vals[("bar",)] = 2
self.assertEquals(metric.render(), [
'values{type="bar"} 2',
'values{type="foo"} 1',
])
class DistributionMetricTestCase(unittest.TestCase):
def test_scalar(self):
metric = DistributionMetric("thing")
self.assertEquals(metric.render(), [
'thing:count 0',
'thing:total 0',
])
metric.inc_by(500)
self.assertEquals(metric.render(), [
'thing:count 1',
'thing:total 500',
])
def test_vector(self):
metric = DistributionMetric("queries", labels=["verb"])
self.assertEquals(metric.render(), [])
metric.inc_by(300, "SELECT")
metric.inc_by(200, "SELECT")
metric.inc_by(800, "INSERT")
self.assertEquals(metric.render(), [
'queries:count{verb="INSERT"} 1',
'queries:count{verb="SELECT"} 2',
'queries:total{verb="INSERT"} 800',
'queries:total{verb="SELECT"} 500',
])
class CacheMetricTestCase(unittest.TestCase):
def test_cache(self):
d = dict()
metric = CacheMetric("cache", lambda: len(d))
self.assertEquals(metric.render(), [
'cache:hits 0',
'cache:total 0',
'cache:size 0',
])
metric.inc_misses()
d["key"] = "value"
self.assertEquals(metric.render(), [
'cache:hits 0',
'cache:total 1',
'cache:size 1',
])
metric.inc_hits()
self.assertEquals(metric.render(), [
'cache:hits 1',
'cache:total 2',
'cache:size 1',
])