1
0

Compare commits

..

1 Commits

Author SHA1 Message Date
Erik Johnston
80a22daf60 Add some tests for federation url encoding 2018-04-10 11:34:50 +01:00
62 changed files with 406 additions and 627 deletions

View File

@@ -1,58 +1,3 @@
Changes in synapse v0.27.3-rc2 (2018-04-09)
==========================================
v0.27.3-rc1 used a stale version of the develop branch so the changelog overstates
the functionality. v0.27.3-rc2 is up to date, rc1 should be ignored.
Changes in synapse v0.27.3-rc1 (2018-04-09)
=======================================
Notable changes include API support for joinability of groups. Also new metrics
and phone home stats. Phone home stats include better visibility of system usage
so we can tweak synpase to work better for all users rather than our own experience
with matrix.org. Also, recording 'r30' stat which is the measure we use to track
overal growth of the Matrix ecosystem. It is defined as:-
Counts the number of native 30 day retained users, defined as:-
* Users who have created their accounts more than 30 days
* Where last seen at most 30 days ago
* Where account creation and last_seen are > 30 days"
Features:
* Add joinability for groups (PR #3045)
* Implement group join API (PR #3046)
* Add counter metrics for calculating state delta (PR #3033)
* R30 stats (PR #3041)
* Measure time it takes to calculate state group ID (PR #3043)
* Add basic performance statistics to phone home (PR #3044)
* Add response size metrics (PR #3071)
* phone home cache size configurations (PR #3063)
Changes:
* Add a blurb explaining the main synapse worker (PR #2886) Thanks to @turt2live!
* Replace old style error catching with 'as' keyword (PR #3000) Thanks to @NotAFile!
* Use .iter* to avoid copies in StateHandler (PR #3006)
* Linearize calls to _generate_user_id (PR #3029)
* Remove last usage of ujson (PR #3030)
* Use simplejson throughout (PR #3048)
* Use static JSONEncoders (PR #3049)
* Remove uses of events.content (PR #3060)
* Improve database cache performance (PR #3068)
Bug fixes:
* Add room_id to the response of `rooms/{roomId}/join` (PR #2986) Thanks to @jplatte!
* Fix replication after switch to simplejson (PR #3015)
* Fix replication after switch to simplejson (PR #3015)
* 404 correctly on missing paths via NoResource (PR #3022)
* Fix error when claiming e2e keys from offline servers (PR #3034)
* fix tests/storage/test_user_directory.py (PR #3042)
* use PUT instead of POST for federating groups/m.join_policy (PR #3070) Thanks to @krombel!
* postgres port script: fix state_groups_pkey error (PR #3072)
Changes in synapse v0.27.2 (2018-03-26)
=======================================

View File

@@ -22,8 +22,6 @@ import argparse
from synapse.events import FrozenEvent
from synapse.util.frozenutils import unfreeze
from six import string_types
def make_graph(file_name, room_id, file_prefix, limit):
print "Reading lines"
@@ -60,7 +58,7 @@ def make_graph(file_name, room_id, file_prefix, limit):
for key, value in unfreeze(event.get_dict()["content"]).items():
if value is None:
value = "<null>"
elif isinstance(value, string_types):
elif isinstance(value, basestring):
pass
else:
value = json.dumps(value)

View File

@@ -202,11 +202,11 @@ new PromConsole.Graph({
<h1>Requests</h1>
<h3>Requests by Servlet</h3>
<div id="synapse_http_server_request_count_servlet"></div>
<div id="synapse_http_server_requests_servlet"></div>
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_request_count_servlet"),
expr: "rate(synapse_http_server_request_count:servlet[2m])",
node: document.querySelector("#synapse_http_server_requests_servlet"),
expr: "rate(synapse_http_server_requests:servlet[2m])",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@@ -215,11 +215,11 @@ new PromConsole.Graph({
})
</script>
<h4>&nbsp;(without <tt>EventStreamRestServlet</tt> or <tt>SyncRestServlet</tt>)</h4>
<div id="synapse_http_server_request_count_servlet_minus_events"></div>
<div id="synapse_http_server_requests_servlet_minus_events"></div>
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_request_count_servlet_minus_events"),
expr: "rate(synapse_http_server_request_count:servlet{servlet!=\"EventStreamRestServlet\", servlet!=\"SyncRestServlet\"}[2m])",
node: document.querySelector("#synapse_http_server_requests_servlet_minus_events"),
expr: "rate(synapse_http_server_requests:servlet{servlet!=\"EventStreamRestServlet\", servlet!=\"SyncRestServlet\"}[2m])",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@@ -233,7 +233,7 @@ new PromConsole.Graph({
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_response_time_avg"),
expr: "rate(synapse_http_server_response_time_seconds[2m]) / rate(synapse_http_server_response_count[2m]) / 1000",
expr: "rate(synapse_http_server_response_time:total[2m]) / rate(synapse_http_server_response_time:count[2m]) / 1000",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@@ -276,7 +276,7 @@ new PromConsole.Graph({
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_response_ru_utime"),
expr: "rate(synapse_http_server_response_ru_utime_seconds[2m])",
expr: "rate(synapse_http_server_response_ru_utime:total[2m])",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@@ -291,7 +291,7 @@ new PromConsole.Graph({
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_response_db_txn_duration"),
expr: "rate(synapse_http_server_response_db_txn_duration_seconds[2m])",
expr: "rate(synapse_http_server_response_db_txn_duration:total[2m])",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@@ -306,7 +306,7 @@ new PromConsole.Graph({
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_send_time_avg"),
expr: "rate(synapse_http_server_response_time_second{servlet='RoomSendEventRestServlet'}[2m]) / rate(synapse_http_server_response_count{servlet='RoomSendEventRestServlet'}[2m]) / 1000",
expr: "rate(synapse_http_server_response_time:total{servlet='RoomSendEventRestServlet'}[2m]) / rate(synapse_http_server_response_time:count{servlet='RoomSendEventRestServlet'}[2m]) / 1000",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,

View File

@@ -1,10 +1,10 @@
synapse_federation_transaction_queue_pendingEdus:total = sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)
synapse_federation_transaction_queue_pendingPdus:total = sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)
synapse_http_server_request_count:method{servlet=""} = sum(synapse_http_server_request_count) by (method)
synapse_http_server_request_count:servlet{method=""} = sum(synapse_http_server_request_count) by (servlet)
synapse_http_server_requests:method{servlet=""} = sum(synapse_http_server_requests) by (method)
synapse_http_server_requests:servlet{method=""} = sum(synapse_http_server_requests) by (servlet)
synapse_http_server_request_count:total{servlet=""} = sum(synapse_http_server_request_count:by_method) by (servlet)
synapse_http_server_requests:total{servlet=""} = sum(synapse_http_server_requests:by_method) by (servlet)
synapse_cache:hit_ratio_5m = rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])
synapse_cache:hit_ratio_30s = rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s])

View File

@@ -5,19 +5,19 @@ groups:
expr: "sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)"
- record: "synapse_federation_transaction_queue_pendingPdus:total"
expr: "sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)"
- record: 'synapse_http_server_request_count:method'
- record: 'synapse_http_server_requests:method'
labels:
servlet: ""
expr: "sum(synapse_http_server_request_count) by (method)"
- record: 'synapse_http_server_request_count:servlet'
expr: "sum(synapse_http_server_requests) by (method)"
- record: 'synapse_http_server_requests:servlet'
labels:
method: ""
expr: 'sum(synapse_http_server_request_count) by (servlet)'
expr: 'sum(synapse_http_server_requests) by (servlet)'
- record: 'synapse_http_server_request_count:total'
- record: 'synapse_http_server_requests:total'
labels:
servlet: ""
expr: 'sum(synapse_http_server_request_count:by_method) by (servlet)'
expr: 'sum(synapse_http_server_requests:by_method) by (servlet)'
- record: 'synapse_cache:hit_ratio_5m'
expr: 'rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])'

View File

@@ -30,8 +30,6 @@ import time
import traceback
import yaml
from six import string_types
logger = logging.getLogger("synapse_port_db")
@@ -576,7 +574,7 @@ class Porter(object):
def conv(j, col):
if j in bool_cols:
return bool(col)
elif isinstance(col, string_types) and "\0" in col:
elif isinstance(col, basestring) and "\0" in col:
logger.warn("DROPPING ROW: NUL value in table %s col %s: %r", table, headers[j], col)
raise BadValueException();
return col

View File

@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
__version__ = "0.27.3-rc2"
__version__ = "0.27.2"

View File

@@ -204,8 +204,8 @@ class Auth(object):
ip_addr = self.hs.get_ip_from_request(request)
user_agent = request.requestHeaders.getRawHeaders(
b"User-Agent",
default=[b""]
"User-Agent",
default=[""]
)[0]
if user and access_token and ip_addr:
self.store.insert_client_ip(
@@ -672,7 +672,7 @@ def has_access_token(request):
bool: False if no access_token was given, True otherwise.
"""
query_params = request.args.get("access_token")
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
auth_headers = request.requestHeaders.getRawHeaders("Authorization")
return bool(query_params) or bool(auth_headers)
@@ -692,8 +692,8 @@ def get_access_token_from_request(request, token_not_found_http_status=401):
AuthError: If there isn't an access_token in the request.
"""
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
query_params = request.args.get(b"access_token")
auth_headers = request.requestHeaders.getRawHeaders("Authorization")
query_params = request.args.get("access_token")
if auth_headers:
# Try the get the access_token from a "Authorization: Bearer"
# header

View File

@@ -36,7 +36,6 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
from synapse.rest.client.v2_alpha._base import client_v2_patterns
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -50,35 +49,6 @@ from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.frontend_proxy")
class PresenceStatusStubServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
def __init__(self, hs):
super(PresenceStatusStubServlet, self).__init__(hs)
self.http_client = hs.get_simple_http_client()
self.auth = hs.get_auth()
self.main_uri = hs.config.worker_main_http_uri
@defer.inlineCallbacks
def on_GET(self, request, user_id):
# Pass through the auth headers, if any, in case the access token
# is there.
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
headers = {
"Authorization": auth_headers,
}
result = yield self.http_client.get_json(
self.main_uri + request.uri,
headers=headers,
)
defer.returnValue((200, result))
@defer.inlineCallbacks
def on_PUT(self, request, user_id):
yield self.auth.get_user_by_req(request)
defer.returnValue((200, {}))
class KeyUploadServlet(RestServlet):
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
@@ -120,7 +90,7 @@ class KeyUploadServlet(RestServlet):
# They're actually trying to upload something, proxy to main synapse.
# Pass through the auth headers, if any, in case the access token
# is there.
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", [])
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
headers = {
"Authorization": auth_headers,
}
@@ -165,7 +135,6 @@ class FrontendProxyServer(HomeServer):
elif name == "client":
resource = JsonResource(self, canonical_json=False)
KeyUploadServlet(self).register(resource)
PresenceStatusStubServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,

View File

@@ -113,7 +113,6 @@ class SynchrotronPresence(object):
logger.info("Presence process_id is %r", self.process_id)
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
return
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
def mark_as_coming_online(self, user_id):
@@ -211,8 +210,6 @@ class SynchrotronPresence(object):
yield self.notify_from_replication(states, stream_id)
def get_currently_syncing_users(self):
# presence is disabled on matrix.org, so we return the empty set
return set()
return [
user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
if count > 0

View File

@@ -108,7 +108,7 @@ def stop(pidfile, app):
Worker = collections.namedtuple("Worker", [
"app", "configfile", "pidfile", "cache_factor", "cache_factors",
"app", "configfile", "pidfile", "cache_factor"
])
@@ -171,10 +171,6 @@ def main():
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
cache_factors = config.get("synctl_cache_factors", {})
for cache_name, factor in cache_factors.iteritems():
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
worker_configfiles = []
if options.worker:
start_stop_synapse = False
@@ -215,10 +211,6 @@ def main():
or pidfile
)
worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
worker_cache_factors = (
worker_config.get("synctl_cache_factors")
or cache_factors
)
daemonize = worker_config.get("daemonize") or config.get("daemonize")
assert daemonize, "Main process must have daemonize set to true"
@@ -234,10 +226,8 @@ def main():
assert worker_daemonize, "In config %r: expected '%s' to be True" % (
worker_configfile, "worker_daemonize")
worker_cache_factor = worker_config.get("synctl_cache_factor")
worker_cache_factors = worker_config.get("synctl_cache_factors", {})
workers.append(Worker(
worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
worker_cache_factors,
))
action = options.action
@@ -272,19 +262,15 @@ def main():
start(configfile)
for worker in workers:
env = os.environ.copy()
if worker.cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
for cache_name, factor in worker.cache_factors.iteritems():
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
start_worker(worker.app, configfile, worker.configfile)
# Reset env back to the original
os.environ.clear()
os.environ.update(env)
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
else:
os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
if __name__ == "__main__":

View File

@@ -21,8 +21,6 @@ from twisted.internet import defer
import logging
import re
from six import string_types
logger = logging.getLogger(__name__)
@@ -148,7 +146,7 @@ class ApplicationService(object):
)
regex = regex_obj.get("regex")
if isinstance(regex, string_types):
if isinstance(regex, basestring):
regex_obj["regex"] = re.compile(regex) # Pre-compile regex
else:
raise ValueError(

View File

@@ -73,8 +73,7 @@ class ApplicationServiceApi(SimpleHttpClient):
super(ApplicationServiceApi, self).__init__(hs)
self.clock = hs.get_clock()
self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta",
timeout_ms=HOUR_IN_MS)
self.protocol_meta_cache = ResponseCache(hs, timeout_ms=HOUR_IN_MS)
@defer.inlineCallbacks
def query_user(self, service, user_id):

View File

@@ -19,8 +19,6 @@ import os
import yaml
from textwrap import dedent
from six import integer_types
class ConfigError(Exception):
pass
@@ -51,7 +49,7 @@ Missing mandatory `server_name` config option.
class Config(object):
@staticmethod
def parse_size(value):
if isinstance(value, integer_types):
if isinstance(value, int) or isinstance(value, long):
return value
sizes = {"K": 1024, "M": 1024 * 1024}
size = 1
@@ -63,7 +61,7 @@ class Config(object):
@staticmethod
def parse_duration(value):
if isinstance(value, integer_types):
if isinstance(value, int) or isinstance(value, long):
return value
second = 1000
minute = 60 * second
@@ -290,22 +288,22 @@ class Config(object):
)
obj.invoke_all("generate_files", config)
config_file.write(config_bytes)
print((
print (
"A config file has been generated in %r for server name"
" %r with corresponding SSL keys and self-signed"
" certificates. Please review this file and customise it"
" to your needs."
) % (config_path, server_name))
print(
) % (config_path, server_name)
print (
"If this server name is incorrect, you will need to"
" regenerate the SSL certificates"
)
return
else:
print((
print (
"Config file %r already exists. Generating any missing key"
" files."
) % (config_path,))
) % (config_path,)
generate_keys = True
parser = argparse.ArgumentParser(

View File

@@ -21,8 +21,6 @@ import urllib
import yaml
import logging
from six import string_types
logger = logging.getLogger(__name__)
@@ -91,14 +89,14 @@ def _load_appservice(hostname, as_info, config_filename):
"id", "as_token", "hs_token", "sender_localpart"
]
for field in required_string_fields:
if not isinstance(as_info.get(field), string_types):
if not isinstance(as_info.get(field), basestring):
raise KeyError("Required string field: '%s' (%s)" % (
field, config_filename,
))
# 'url' must either be a string or explicitly null, not missing
# to avoid accidentally turning off push for ASes.
if (not isinstance(as_info.get("url"), string_types) and
if (not isinstance(as_info.get("url"), basestring) and
as_info.get("url", "") is not None):
raise KeyError(
"Required string field or explicit null: 'url' (%s)" % (config_filename,)
@@ -130,7 +128,7 @@ def _load_appservice(hostname, as_info, config_filename):
"Expected namespace entry in %s to be an object,"
" but got %s", ns, regex_obj
)
if not isinstance(regex_obj.get("regex"), string_types):
if not isinstance(regex_obj.get("regex"), basestring):
raise ValueError(
"Missing/bad type 'regex' key in %s", regex_obj
)

View File

@@ -65,7 +65,7 @@ class FederationServer(FederationBase):
# We cache responses to state queries, as they take a while and often
# come in waves.
self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000)
self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
@defer.inlineCallbacks
@log_function

View File

@@ -169,7 +169,7 @@ class TransactionQueue(object):
while True:
last_token = yield self.store.get_federation_out_pos("events")
next_token, events = yield self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100,
last_token, self._last_poked_id, limit=20,
)
logger.debug("Handling %s -> %s", last_token, next_token)
@@ -177,33 +177,24 @@ class TransactionQueue(object):
if not events and next_token >= self._last_poked_id:
break
@defer.inlineCallbacks
def handle_event(event):
for event in events:
# Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.event_id)
if not is_mine and send_on_behalf_of is None:
return
try:
# Get the state from before the event.
# We need to make sure that this is the state from before
# the event and not from after it.
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room(
event.room_id, latest_event_ids=[
prev_id for prev_id, _ in event.prev_events
],
)
except Exception:
logger.exception(
"Failed to calculate hosts in room for event: %s",
event.event_id,
)
return
continue
# Get the state from before the event.
# We need to make sure that this is the state from before
# the event and not from after it.
# Otherwise if the last member on a server in a room is
# banned then it won't receive the event because it won't
# be in the room after the ban.
destinations = yield self.state.get_current_hosts_in_room(
event.room_id, latest_event_ids=[
prev_id for prev_id, _ in event.prev_events
],
)
destinations = set(destinations)
if send_on_behalf_of is not None:
@@ -216,44 +207,12 @@ class TransactionQueue(object):
self._send_pdu(event, destinations)
@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
yield handle_event(event)
events_by_room = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(handle_room_events, evs)
for evs in events_by_room.itervalues()
],
consumeErrors=True
))
events_processed_counter.inc_by(len(events))
yield self.store.update_federation_out_pos(
"events", next_token
)
if events:
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_lag.set(
now - ts, "federation_sender",
)
synapse.metrics.event_processing_last_ts.set(
ts, "federation_sender",
)
events_processed_counter.inc_by(len(events))
synapse.metrics.event_processing_positions.set(
next_token, "federation_sender",
)
finally:
self._is_processing = False
@@ -295,7 +254,6 @@ class TransactionQueue(object):
Args:
states (list(UserPresenceState))
"""
return
# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled

View File

@@ -18,9 +18,7 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
from synapse.util.logcontext import (
make_deferred_yieldable, preserve_fn, run_in_background,
)
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
import logging
@@ -86,16 +84,11 @@ class ApplicationServicesHandler(object):
if not events:
break
events_by_room = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
@defer.inlineCallbacks
def handle_event(event):
# Gather interested services
services = yield self._get_services_for_event(event)
if len(services) == 0:
return # no services need notifying
continue # no services need notifying
# Do we know this user exists? If not, poke the user
# query API for all services which match that user regex.
@@ -115,33 +108,9 @@ class ApplicationServicesHandler(object):
service, event
)
@defer.inlineCallbacks
def handle_room_events(events):
for event in events:
yield handle_event(event)
yield make_deferred_yieldable(defer.gatherResults([
run_in_background(handle_room_events, evs)
for evs in events_by_room.itervalues()
], consumeErrors=True))
yield self.store.set_appservice_last_pos(upper_bound)
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_positions.set(
upper_bound, "appservice_sender",
)
events_processed_counter.inc_by(len(events))
synapse.metrics.event_processing_lag.set(
now - ts, "appservice_sender",
)
synapse.metrics.event_processing_last_ts.set(
ts, "appservice_sender",
)
yield self.store.set_appservice_last_pos(upper_bound)
finally:
self.is_processing = False

View File

@@ -372,7 +372,6 @@ class InitialSyncHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
defer.returnValue([])
states = yield presence_handler.get_states(
[m.user_id for m in room_members],
as_event=True,

View File

@@ -384,7 +384,7 @@ class MessageHandler(BaseHandler):
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or becuase there
# is a user in the room that the AS is "interested in"
if False and requester.app_service and user_id not in users_with_profile:
if requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break
@@ -454,39 +454,40 @@ class EventCreationHandler(object):
"""
builder = self.event_builder_factory.new(event_dict)
self.validator.validate_new(builder)
with (yield self.limiter.queue(builder.room_id)):
self.validator.validate_new(builder)
if builder.type == EventTypes.Member:
membership = builder.content.get("membership", None)
target = UserID.from_string(builder.state_key)
if builder.type == EventTypes.Member:
membership = builder.content.get("membership", None)
target = UserID.from_string(builder.state_key)
if membership in {Membership.JOIN, Membership.INVITE}:
# If event doesn't include a display name, add one.
profile = self.profile_handler
content = builder.content
if membership in {Membership.JOIN, Membership.INVITE}:
# If event doesn't include a display name, add one.
profile = self.profile_handler
content = builder.content
try:
if "displayname" not in content:
content["displayname"] = yield profile.get_displayname(target)
if "avatar_url" not in content:
content["avatar_url"] = yield profile.get_avatar_url(target)
except Exception as e:
logger.info(
"Failed to get profile information for %r: %s",
target, e
)
try:
if "displayname" not in content:
content["displayname"] = yield profile.get_displayname(target)
if "avatar_url" not in content:
content["avatar_url"] = yield profile.get_avatar_url(target)
except Exception as e:
logger.info(
"Failed to get profile information for %r: %s",
target, e
)
if token_id is not None:
builder.internal_metadata.token_id = token_id
if token_id is not None:
builder.internal_metadata.token_id = token_id
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id
event, context = yield self.create_new_client_event(
builder=builder,
requester=requester,
prev_event_ids=prev_event_ids,
)
event, context = yield self.create_new_client_event(
builder=builder,
requester=requester,
prev_event_ids=prev_event_ids,
)
defer.returnValue((event, context))
@@ -556,34 +557,27 @@ class EventCreationHandler(object):
See self.create_event and self.send_nonmember_event.
"""
event, context = yield self.create_event(
requester,
event_dict,
token_id=requester.access_token_id,
txn_id=txn_id
)
# We limit the number of concurrent event sends in a room so that we
# don't fork the DAG too much. If we don't limit then we can end up in
# a situation where event persistence can't keep up, causing
# extremities to pile up, which in turn leads to state resolution
# taking longer.
with (yield self.limiter.queue(event_dict["room_id"])):
event, context = yield self.create_event(
requester,
event_dict,
token_id=requester.access_token_id,
txn_id=txn_id
spam_error = self.spam_checker.check_event_for_spam(event)
if spam_error:
if not isinstance(spam_error, basestring):
spam_error = "Spam is not permitted here"
raise SynapseError(
403, spam_error, Codes.FORBIDDEN
)
spam_error = self.spam_checker.check_event_for_spam(event)
if spam_error:
if not isinstance(spam_error, basestring):
spam_error = "Spam is not permitted here"
raise SynapseError(
403, spam_error, Codes.FORBIDDEN
)
yield self.send_nonmember_event(
requester,
event,
context,
ratelimit=ratelimit,
)
yield self.send_nonmember_event(
requester,
event,
context,
ratelimit=ratelimit,
)
defer.returnValue(event)
@measure_func("create_new_client_event")

View File

@@ -373,7 +373,6 @@ class PresenceHandler(object):
"""We've seen the user do something that indicates they're interacting
with the app.
"""
return
user_id = user.to_string()
bump_active_time_counter.inc()
@@ -403,7 +402,6 @@ class PresenceHandler(object):
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
affect_presence = False
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
@@ -446,8 +444,6 @@ class PresenceHandler(object):
Returns:
set(str): A set of user_id strings.
"""
# presence is disabled on matrix.org, so we return the empty set
return set()
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
@@ -467,7 +463,6 @@ class PresenceHandler(object):
syncing_user_ids(set(str)): The set of user_ids that are
currently syncing on that server.
"""
return
# Grab the previous list of user_ids that were syncing on that process
prev_syncing_user_ids = (

View File

@@ -23,7 +23,7 @@ from synapse.api.errors import (
)
from synapse.http.client import CaptchaServerHttpClient
from synapse import types
from synapse.types import UserID, create_requester, RoomID, RoomAlias
from synapse.types import UserID
from synapse.util.async import run_on_reactor, Linearizer
from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
@@ -205,17 +205,10 @@ class RegistrationHandler(BaseHandler):
token = None
attempts += 1
# auto-join the user to any rooms we're supposed to dump them into
fake_requester = create_requester(user_id)
for r in self.hs.config.auto_join_rooms:
try:
yield self._join_user_to_room(fake_requester, r)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
# We used to generate default identicons here, but nowadays
# we want clients to generate their own as part of their branding
# rather than there being consistent matrix-wide ones, so we don't.
defer.returnValue((user_id, token))
@defer.inlineCallbacks
@@ -490,28 +483,3 @@ class RegistrationHandler(BaseHandler):
)
defer.returnValue((user_id, access_token))
@defer.inlineCallbacks
def _join_user_to_room(self, requester, room_identifier):
room_id = None
room_member_handler = self.hs.get_room_member_handler()
if RoomID.is_valid(room_identifier):
room_id = room_identifier
elif RoomAlias.is_valid(room_identifier):
room_alias = RoomAlias.from_string(room_identifier)
room_id, remote_room_hosts = (
yield room_member_handler.lookup_room_alias(room_alias)
)
room_id = room_id.to_string()
else:
raise SynapseError(400, "%s was not legal room ID or room alias" % (
room_identifier,
))
yield room_member_handler.update_membership(
requester=requester,
target=requester.user,
room_id=room_id,
remote_room_hosts=remote_room_hosts,
action="join",
)

View File

@@ -44,9 +44,8 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache(hs, "room_list", timeout_ms=10 * 60 * 1000)
self.remote_response_cache = ResponseCache(hs, "remote_room_list",
timeout_ms=30 * 1000)
self.response_cache = ResponseCache(hs)
self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
def get_local_public_room_list(self, limit=None, since_token=None,
search_filter=None,

View File

@@ -28,7 +28,7 @@ from synapse.api.constants import (
)
from synapse.api.errors import AuthError, SynapseError, Codes
from synapse.types import UserID, RoomID
from synapse.util.async import Linearizer, Limiter
from synapse.util.async import Linearizer
from synapse.util.distributor import user_left_room, user_joined_room
@@ -60,7 +60,6 @@ class RoomMemberHandler(object):
self.event_creation_hander = hs.get_event_creation_handler()
self.member_linearizer = Linearizer(name="member")
self.member_limiter = Limiter(20)
self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker()
@@ -233,23 +232,18 @@ class RoomMemberHandler(object):
):
key = (room_id,)
as_id = object()
if requester.app_service:
as_id = requester.app_service.id
with (yield self.member_limiter.queue(as_id)):
with (yield self.member_linearizer.queue(key)):
result = yield self._update_membership(
requester,
target,
room_id,
action,
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
)
with (yield self.member_linearizer.queue(key)):
result = yield self._update_membership(
requester,
target,
room_id,
action,
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
content=content,
)
defer.returnValue(result)
@@ -858,14 +852,6 @@ class RoomMemberMasterHandler(RoomMemberHandler):
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join
"""
# filter ourselves out of remote_room_hosts: do_invite_join ignores it
# and if it is the only entry we'd like to return a 404 rather than a
# 500.
remote_room_hosts = [
host for host in remote_room_hosts if host != self.hs.hostname
]
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")

View File

@@ -30,7 +30,6 @@ import itertools
logger = logging.getLogger(__name__)
SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
@@ -170,10 +169,7 @@ class SyncHandler(object):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.response_cache = ResponseCache(
hs, "sync",
timeout_ms=SYNC_RESPONSE_CACHE_MS,
)
self.response_cache = ResponseCache(hs)
self.state = hs.get_state_handler()
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
@@ -603,7 +599,7 @@ class SyncHandler(object):
since_token is None and
sync_config.filter_collection.blocks_all_presence()
)
if False and not block_all_presence_data:
if not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users
)

View File

@@ -12,6 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet import defer, reactor
from twisted.internet.error import ConnectError
@@ -31,7 +33,7 @@ SERVER_CACHE = {}
# our record of an individual server which can be tried to reach a destination.
#
# "host" is the hostname acquired from the SRV record. Except when there's
# "host" is actually a dotted-quad or ipv6 address string. Except when there's
# no SRV record, in which case it is the original hostname.
_Server = collections.namedtuple(
"_Server", "priority weight host port expires"
@@ -295,13 +297,20 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
payload = answer.payload
servers.append(_Server(
host=str(payload.target),
port=int(payload.port),
priority=int(payload.priority),
weight=int(payload.weight),
expires=int(clock.time()) + answer.ttl,
))
hosts = yield _get_hosts_for_srv_record(
dns_client, str(payload.target)
)
for (ip, ttl) in hosts:
host_ttl = min(answer.ttl, ttl)
servers.append(_Server(
host=ip,
port=int(payload.port),
priority=int(payload.priority),
weight=int(payload.weight),
expires=int(clock.time()) + host_ttl,
))
servers.sort()
cache[service_name] = list(servers)
@@ -319,3 +328,81 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
raise e
defer.returnValue(servers)
@defer.inlineCallbacks
def _get_hosts_for_srv_record(dns_client, host):
"""Look up each of the hosts in a SRV record
Args:
dns_client (twisted.names.dns.IResolver):
host (basestring): host to look up
Returns:
Deferred[list[(str, int)]]: a list of (host, ttl) pairs
"""
ip4_servers = []
ip6_servers = []
def cb(res):
# lookupAddress and lookupIP6Address return a three-tuple
# giving the answer, authority, and additional sections of the
# response.
#
# we only care about the answers.
return res[0]
def eb(res, record_type):
if res.check(DNSNameError):
return []
logger.warn("Error looking up %s for %s: %s", record_type, host, res)
return res
# no logcontexts here, so we can safely fire these off and gatherResults
d1 = dns_client.lookupAddress(host).addCallbacks(
cb, eb, errbackArgs=("A", ))
d2 = dns_client.lookupIPV6Address(host).addCallbacks(
cb, eb, errbackArgs=("AAAA", ))
results = yield defer.DeferredList(
[d1, d2], consumeErrors=True)
# if all of the lookups failed, raise an exception rather than blowing out
# the cache with an empty result.
if results and all(s == defer.FAILURE for (s, _) in results):
defer.returnValue(results[0][1])
for (success, result) in results:
if success == defer.FAILURE:
continue
for answer in result:
if not answer.payload:
continue
try:
if answer.type == dns.A:
ip = answer.payload.dottedQuad()
ip4_servers.append((ip, answer.ttl))
elif answer.type == dns.AAAA:
ip = socket.inet_ntop(
socket.AF_INET6, answer.payload.address,
)
ip6_servers.append((ip, answer.ttl))
else:
# the most likely candidate here is a CNAME record.
# rfc2782 says srvs may not point to aliases.
logger.warn(
"Ignoring unexpected DNS record type %s for %s",
answer.type, host,
)
continue
except Exception as e:
logger.warn("Ignoring invalid DNS response for %s: %s",
host, e)
continue
# keep the ipv4 results before the ipv6 results, mostly to match historical
# behaviour.
defer.returnValue(ip4_servers + ip6_servers)

View File

@@ -329,7 +329,7 @@ class JsonResource(HttpServer, resource.Resource):
register_paths, so will return (possibly via Deferred) either
None, or a tuple of (http code, response body).
"""
if request.method == b"OPTIONS":
if request.method == "OPTIONS":
return _options_handler, {}
# Loop through all the registered callbacks to check if the method
@@ -543,7 +543,7 @@ def finish_request(request):
def _request_user_agent_is_curl(request):
user_agents = request.requestHeaders.getRawHeaders(
b"User-Agent", default=[]
"User-Agent", default=[]
)
for user_agent in user_agents:
if "curl" in user_agent:

View File

@@ -20,7 +20,7 @@ import logging
import re
import time
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
class SynapseRequest(Request):
@@ -43,12 +43,12 @@ class SynapseRequest(Request):
def get_redacted_uri(self):
return ACCESS_TOKEN_RE.sub(
br'\1<redacted>\3',
r'\1<redacted>\3',
self.uri
)
def get_user_agent(self):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1]
def started_processing(self):
self.site.access_logger.info(

View File

@@ -17,13 +17,12 @@ import logging
import functools
import time
import gc
import platform
from twisted.internet import reactor
from .metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
MemoryUsageMetric, GaugeMetric,
MemoryUsageMetric,
)
from .process_collector import register_process_collector
@@ -31,7 +30,6 @@ from .process_collector import register_process_collector
logger = logging.getLogger(__name__)
running_on_pypy = platform.python_implementation() == 'PyPy'
all_metrics = []
all_collectors = []
@@ -65,13 +63,6 @@ class Metrics(object):
"""
return self._register(CounterMetric, *args, **kwargs)
def register_gauge(self, *args, **kwargs):
"""
Returns:
GaugeMetric
"""
return self._register(GaugeMetric, *args, **kwargs)
def register_callback(self, *args, **kwargs):
"""
Returns:
@@ -151,32 +142,6 @@ reactor_metrics = get_metrics_for("python.twisted.reactor")
tick_time = reactor_metrics.register_distribution("tick_time")
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
synapse_metrics = get_metrics_for("synapse")
# Used to track where various components have processed in the event stream,
# e.g. federation sending, appservice sending, etc.
event_processing_positions = synapse_metrics.register_gauge(
"event_processing_positions", labels=["name"],
)
# Used to track the current max events stream position
event_persisted_position = synapse_metrics.register_gauge(
"event_persisted_position",
)
# Used to track the received_ts of the last event processed by various
# components
event_processing_last_ts = synapse_metrics.register_gauge(
"event_processing_last_ts", labels=["name"],
)
# Used to track the lag processing events. This is the time difference
# between the last processed event's received_ts and the time it was
# finished being processed.
event_processing_lag = synapse_metrics.register_gauge(
"event_processing_lag", labels=["name"],
)
def runUntilCurrentTimer(func):
@@ -209,9 +174,6 @@ def runUntilCurrentTimer(func):
tick_time.inc_by(end - start)
pending_calls_metric.inc_by(num_pending)
if running_on_pypy:
return ret
# Check if we need to do a manual GC (since its been disabled), and do
# one if necessary.
threshold = gc.get_threshold()
@@ -244,7 +206,6 @@ try:
# We manually run the GC each reactor tick so that we can get some metrics
# about time spent doing GC,
if not running_on_pypy:
gc.disable()
gc.disable()
except AttributeError:
pass

View File

@@ -115,7 +115,7 @@ class CounterMetric(BaseMetric):
# dict[list[str]]: value for each set of label values. the keys are the
# label values, in the same order as the labels in self.labels.
#
# (if the metric is a scalar, the (single) key is the empty tuple).
# (if the metric is a scalar, the (single) key is the empty list).
self.counts = {}
# Scalar metrics are never empty
@@ -145,36 +145,6 @@ class CounterMetric(BaseMetric):
)
class GaugeMetric(BaseMetric):
"""A metric that can go up or down
"""
def __init__(self, *args, **kwargs):
super(GaugeMetric, self).__init__(*args, **kwargs)
# dict[list[str]]: value for each set of label values. the keys are the
# label values, in the same order as the labels in self.labels.
#
# (if the metric is a scalar, the (single) key is the empty tuple).
self.guages = {}
def set(self, v, *values):
if len(values) != self.dimension():
raise ValueError(
"Expected as many values to inc() as labels (%d)" % (self.dimension())
)
# TODO: should assert that the tag values are all strings
self.guages[values] = v
def render(self):
return flatten(
self._render_for_labels(k, self.guages[k])
for k in sorted(self.guages.keys())
)
class CallbackMetric(BaseMetric):
"""A metric that returns the numeric value returned by a callback whenever
it is rendered. Typically this is used to implement gauges that yield the

View File

@@ -115,7 +115,7 @@ class ReplicationSendEventRestServlet(RestServlet):
self.clock = hs.get_clock()
# The responses are tiny, so we may as well cache them for a while
self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
self.response_cache = ResponseCache(hs, timeout_ms=30 * 60 * 1000)
def on_PUT(self, request, event_id):
result = self.response_cache.get(event_id)

View File

@@ -42,8 +42,6 @@ class SlavedClientIpStore(BaseSlavedStore):
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return
self.client_ip_last_seen.prefill(key, now)
self.hs.get_tcp_replication().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now
)

View File

@@ -33,7 +33,7 @@ import logging
logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 500000
MAX_EVENTS_BEHIND = 10000
EventStreamRow = namedtuple("EventStreamRow", (

View File

@@ -296,6 +296,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
)
new_room_id = info["room_id"]
yield self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
"type": "m.room.message",
"content": {"body": message, "msgtype": "m.text"},
"room_id": new_room_id,
"sender": new_room_user_id,
},
ratelimit=False,
)
requester_user_id = requester.user.to_string()
logger.info("Shutting down room %r", room_id)
@@ -333,17 +344,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
kicked_users.append(user_id)
yield self.event_creation_handler.create_and_send_nonmember_event(
room_creator_requester,
{
"type": "m.room.message",
"content": {"body": message, "msgtype": "m.text"},
"room_id": new_room_id,
"sender": new_room_user_id,
},
ratelimit=False,
)
aliases_for_room = yield self.store.get_aliases_for_room(room_id)
yield self.store.update_aliases_for_room(

View File

@@ -44,10 +44,7 @@ class LogoutRestServlet(ClientV1RestServlet):
requester = yield self.auth.get_user_by_req(request)
except AuthError:
# this implies the access token has already been deleted.
defer.returnValue((401, {
"errcode": "M_UNKNOWN_TOKEN",
"error": "Access Token unknown or expired"
}))
pass
else:
if requester.device_id is None:
# the acccess token wasn't associated with a device.

View File

@@ -81,7 +81,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
except Exception:
raise SynapseError(400, "Unable to parse state")
# yield self.presence_handler.set_state(user, state)
yield self.presence_handler.set_state(user, state)
defer.returnValue((200, {}))

View File

@@ -348,9 +348,9 @@ class RegisterRestServlet(ClientV1RestServlet):
admin = register_json.get("admin", None)
# Its important to check as we use null bytes as HMAC field separators
if b"\x00" in user:
if "\x00" in user:
raise SynapseError(400, "Invalid user")
if b"\x00" in password:
if "\x00" in password:
raise SynapseError(400, "Invalid password")
# str() because otherwise hmac complains that 'unicode' does not

View File

@@ -165,12 +165,17 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
content=content,
)
else:
event = yield self.event_creation_hander.create_and_send_nonmember_event(
event, context = yield self.event_creation_hander.create_event(
requester,
event_dict,
token_id=requester.access_token_id,
txn_id=txn_id,
)
yield self.event_creation_hander.send_nonmember_event(
requester, event, context,
)
ret = {}
if event:
ret = {"event_id": event.event_id}

View File

@@ -20,6 +20,7 @@ import synapse
import synapse.types
from synapse.api.auth import get_access_token_from_request, has_access_token
from synapse.api.constants import LoginType
from synapse.types import RoomID, RoomAlias
from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError
from synapse.http.servlet import (
RestServlet, parse_json_object_from_request, assert_params_in_request, parse_string
@@ -404,6 +405,14 @@ class RegisterRestServlet(RestServlet):
generate_token=False,
)
# auto-join the user to any rooms we're supposed to dump them into
fake_requester = synapse.types.create_requester(registered_user_id)
for r in self.hs.config.auto_join_rooms:
try:
yield self._join_user_to_room(fake_requester, r)
except Exception as e:
logger.error("Failed to join new user to %r: %r", r, e)
# remember that we've now registered that user account, and with
# what user ID (since the user may not have specified)
self.auth_handler.set_session_data(
@@ -436,6 +445,29 @@ class RegisterRestServlet(RestServlet):
def on_OPTIONS(self, _):
return 200, {}
@defer.inlineCallbacks
def _join_user_to_room(self, requester, room_identifier):
room_id = None
if RoomID.is_valid(room_identifier):
room_id = room_identifier
elif RoomAlias.is_valid(room_identifier):
room_alias = RoomAlias.from_string(room_identifier)
room_id, remote_room_hosts = (
yield self.room_member_handler.lookup_room_alias(room_alias)
)
room_id = room_id.to_string()
else:
raise SynapseError(400, "%s was not legal room ID or room alias" % (
room_identifier,
))
yield self.room_member_handler.update_membership(
requester=requester,
target=requester.user,
room_id=room_id,
action="join",
)
@defer.inlineCallbacks
def _do_appservice_registration(self, username, as_token, body):
user_id = yield self.registration_handler.appservice_register(

View File

@@ -16,8 +16,6 @@
from twisted.internet import defer, threads
from twisted.protocols.basic import FileSender
import six
from ._base import Responder
from synapse.util.file_consumer import BackgroundFileConsumer
@@ -121,7 +119,7 @@ class MediaStorage(object):
os.remove(fname)
except Exception:
pass
six.reraise(t, v, tb)
raise t, v, tb
if not finished_called:
raise Exception("Finished callback not called")

View File

@@ -266,16 +266,16 @@ class DataStore(RoomMemberStore, RoomStore,
def count_r30_users(self):
"""
Counts the number of 30 day retained users, defined as:-
* Users who have created their accounts more than 30 days ago
* Users who have created their accounts more than 30 days
* Where last seen at most 30 days ago
* Where account creation and last_seen are > 30 days apart
* Where account creation and last_seen are > 30 days
Returns counts globaly for a given user as well as breaking
by platform
"""
def _count_r30_users(txn):
thirty_days_in_secs = 86400 * 30
now = int(self._clock.time())
now = int(self._clock.time_msec())
thirty_days_ago_in_secs = now - thirty_days_in_secs
sql = """
@@ -289,11 +289,11 @@ class DataStore(RoomMemberStore, RoomStore,
user_id,
last_seen,
CASE
WHEN user_agent LIKE '%%Android%%' THEN 'android'
WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
WHEN user_agent LIKE '%Android%' THEN 'android'
WHEN user_agent LIKE '%iOS%' THEN 'ios'
WHEN user_agent LIKE '%Electron%' THEN 'electron'
WHEN user_agent LIKE '%Mozilla%' THEN 'web'
WHEN user_agent LIKE '%Gecko%' THEN 'web'
ELSE 'unknown'
END
AS platform

View File

@@ -376,7 +376,7 @@ class SQLBaseStore(object):
Returns:
A list of dicts where the key is the column header.
"""
col_headers = list(intern(str(column[0])) for column in cursor.description)
col_headers = list(intern(column[0]) for column in cursor.description)
results = list(
dict(zip(col_headers, row)) for row in cursor
)

View File

@@ -28,7 +28,7 @@ logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
LAST_SEEN_GRANULARITY = 10 * 60 * 1000
LAST_SEEN_GRANULARITY = 120 * 1000
class ClientIpStore(background_updates.BackgroundUpdateStore):

View File

@@ -18,7 +18,6 @@ from .postgres import PostgresEngine
from .sqlite3 import Sqlite3Engine
import importlib
import platform
SUPPORTED_MODULE = {
@@ -32,10 +31,6 @@ def create_engine(database_config):
engine_class = SUPPORTED_MODULE.get(name, None)
if engine_class:
# pypy requires psycopg2cffi rather than psycopg2
if (name == "psycopg2" and
platform.python_implementation() == "PyPy"):
name = "psycopg2cffi"
module = importlib.import_module(name)
return engine_class(module, database_config)

View File

@@ -613,9 +613,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
self._rotate_notifs, 30 * 60 * 1000
)
self._rotate_delay = 3
self._rotate_count = 10000
def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
all_events_and_contexts):
"""Handles moving push actions from staging table to main
@@ -812,7 +809,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
)
if caught_up:
break
yield sleep(self._rotate_delay)
yield sleep(5)
finally:
self._doing_notif_rotation = False
@@ -833,8 +830,8 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
txn.execute("""
SELECT stream_ordering FROM event_push_actions
WHERE stream_ordering > ?
ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
""", (old_rotate_stream_ordering, self._rotate_count))
ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
""", (old_rotate_stream_ordering,))
stream_row = txn.fetchone()
if stream_row:
offset_stream_ordering, = stream_row

View File

@@ -444,9 +444,6 @@ class EventsStore(EventsWorkerStore):
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc_by(len(chunk))
synapse.metrics.event_persisted_position.set(
chunk[-1][0].internal_metadata.stream_ordering,
)
for event, context in chunk:
if context.app_service:
origin_type = "local"
@@ -1013,6 +1010,7 @@ class EventsStore(EventsWorkerStore):
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
"event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
@@ -1032,14 +1030,6 @@ class EventsStore(EventsWorkerStore):
[(ev.event_id,) for ev, _ in events_and_contexts]
)
for table in (
"event_push_actions",
):
txn.executemany(
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
[(ev.event_id,) for ev, _ in events_and_contexts]
)
def _store_event_txn(self, txn, events_and_contexts):
"""Insert new events into the event and event_json tables

View File

@@ -51,26 +51,6 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
class EventsWorkerStore(SQLBaseStore):
def get_received_ts(self, event_id):
"""Get received_ts (when it was persisted) for the event.
Raises an exception for unknown events.
Args:
event_id (str)
Returns:
Deferred[int|None]: Timestamp in milliseconds, or None for events
that were persisted before received_ts was implemented.
"""
return self._simple_select_one_onecol(
table="events",
keyvalues={
"event_id": event_id,
},
retcol="received_ts",
desc="get_received_ts",
)
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,

View File

@@ -65,7 +65,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
defer.returnValue(hosts)
@cachedInlineCallbacks(max_entries=100000, iterable=True)
@cached(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id):
def f(txn):
sql = (
@@ -79,14 +79,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
txn.execute(sql, (room_id, Membership.JOIN,))
return [to_ascii(r[0]) for r in txn]
start_time = self._clock.time_msec()
result = yield self.runInteraction("get_users_in_room", f)
end_time = self._clock.time_msec()
logger.info(
"Fetched room membership for %s (%i users) in %i ms",
room_id, len(result), end_time - start_time,
)
defer.returnValue(result)
return self.runInteraction("get_users_in_room", f)
@cached()
def get_invited_rooms_for_user(self, user_id):
@@ -460,7 +453,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
defer.returnValue(joined_hosts)
@cached(max_entries=10000)
@cached(max_entries=10000, iterable=True)
def _get_joined_hosts_cache(self, room_id):
return _JoinedHostsCache(self, room_id)

View File

@@ -721,7 +721,7 @@ def _parse_query(database_engine, search_term):
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
if isinstance(database_engine, PostgresEngine):
return " & ".join(result for result in results)
return " & ".join(result + ":*" for result in results)
elif isinstance(database_engine, Sqlite3Engine):
return " & ".join(result + "*" for result in results)
else:

View File

@@ -20,7 +20,7 @@ from twisted.internet import defer
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine
from synapse.util.caches import intern_string, get_cache_factor_for
from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.stringutils import to_ascii
@@ -54,7 +54,7 @@ class StateGroupWorkerStore(SQLBaseStore):
super(StateGroupWorkerStore, self).__init__(db_conn, hs)
self._state_group_cache = DictionaryCache(
"*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache")
"*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
)
@cached(max_entries=100000, iterable=True)
@@ -566,7 +566,8 @@ class StateGroupWorkerStore(SQLBaseStore):
state_dict = results[group]
state_dict.update(
group_state_dict.iteritems()
((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
for k, v in group_state_dict.iteritems()
)
self._state_group_cache.update(

View File

@@ -169,7 +169,7 @@ class DomainSpecificString(
except Exception:
return False
__repr__ = to_string
__str__ = to_string
class UserID(DomainSpecificString):

View File

@@ -18,16 +18,6 @@ import os
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
def get_cache_factor_for(cache_name):
env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper()
factor = os.environ.get(env_var)
if factor:
return float(factor)
return CACHE_SIZE_FACTOR
metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
caches_by_name = {}

View File

@@ -17,7 +17,7 @@ import logging
from synapse.util.async import ObservableDeferred
from synapse.util import unwrapFirstError, logcontext
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
from synapse.util.stringutils import to_ascii
@@ -310,7 +310,7 @@ class CacheDescriptor(_CacheDescriptorBase):
orig, num_args=num_args, inlineCallbacks=inlineCallbacks,
cache_context=cache_context)
max_entries = int(max_entries * get_cache_factor_for(orig.__name__))
max_entries = int(max_entries * CACHE_SIZE_FACTOR)
self.max_entries = max_entries
self.tree = tree

View File

@@ -14,7 +14,6 @@
# limitations under the License.
from synapse.util.async import ObservableDeferred
from synapse.util.caches import metrics as cache_metrics
class ResponseCache(object):
@@ -25,63 +24,20 @@ class ResponseCache(object):
used rather than trying to compute a new response.
"""
def __init__(self, hs, name, timeout_ms=0):
def __init__(self, hs, timeout_ms=0):
self.pending_result_cache = {} # Requests that haven't finished yet.
self.clock = hs.get_clock()
self.timeout_sec = timeout_ms / 1000.
self._metrics = cache_metrics.register_cache(
"response_cache",
size_callback=lambda: self.size(),
cache_name=name,
)
def size(self):
return len(self.pending_result_cache)
def get(self, key):
"""Look up the given key.
Returns a deferred which doesn't follow the synapse logcontext rules,
so you'll probably want to make_deferred_yieldable it.
Args:
key (str):
Returns:
twisted.internet.defer.Deferred|None: None if there is no entry
for this key; otherwise a deferred result.
"""
result = self.pending_result_cache.get(key)
if result is not None:
self._metrics.inc_hits()
return result.observe()
else:
self._metrics.inc_misses()
return None
def set(self, key, deferred):
"""Set the entry for the given key to the given deferred.
*deferred* should run its callbacks in the sentinel logcontext (ie,
you should wrap normal synapse deferreds with
logcontext.run_in_background).
Returns a new Deferred which also doesn't follow the synapse logcontext
rules, so you will want to make_deferred_yieldable it
(TODO: before using this more widely, it might make sense to refactor
it and get() so that they do the necessary wrapping rather than having
to do it everywhere ResponseCache is used.)
Args:
key (str):
deferred (twisted.internet.defer.Deferred):
Returns:
twisted.internet.defer.Deferred
"""
result = ObservableDeferred(deferred, consumeErrors=True)
self.pending_result_cache[key] = result

View File

@@ -0,0 +1,14 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -0,0 +1,14 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tests import unittest
from synapse.federation.transport.client import _create_path
class PathEncodeTestCase(unittest.TestCase):
def test_path_encoding(self):
P = "/foo"
self.assertEqual("/foo/bar", _create_path(P, "/bar"))
self.assertEqual("/foo/bar/e", _create_path(P, "/bar/%s", "e"))
self.assertEqual("/foo/bar/%24e", _create_path(P, "/bar/%s", "$e"))
self.assertEqual("/foo/bar/%2Fe%2F", _create_path(P, "/bar/%s", "/e/"))
self.assertEqual("/foo/bar/x/y", _create_path(P, "/bar/%s/%s", "x", "y"))

View File

@@ -31,7 +31,6 @@ class AppServiceHandlerTestCase(unittest.TestCase):
self.mock_scheduler = Mock()
hs = Mock()
hs.get_datastore = Mock(return_value=self.mock_store)
self.mock_store.get_received_ts.return_value = 0
hs.get_application_service_api = Mock(return_value=self.mock_as_api)
hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler)
hs.get_clock.return_value = MockClock()

View File

@@ -123,7 +123,6 @@ class EventStreamPermissionsTestCase(RestTestCase):
self.ratelimiter.send_message.return_value = (True, 0)
hs.config.enable_registration_captcha = False
hs.config.enable_registration = True
hs.config.auto_join_rooms = []
hs.get_handlers().federation_handler = Mock()

View File

@@ -984,13 +984,11 @@ class RoomInitialSyncTestCase(RestTestCase):
self.assertTrue("presence" in response)
# presence is turned off on hotfixes
# presence_by_user = {
# e["content"]["user_id"]: e for e in response["presence"]
# }
# self.assertTrue(self.user_id in presence_by_user)
# self.assertEquals("m.presence", presence_by_user[self.user_id]["type"])
presence_by_user = {
e["content"]["user_id"]: e for e in response["presence"]
}
self.assertTrue(self.user_id in presence_by_user)
self.assertEquals("m.presence", presence_by_user[self.user_id]["type"])
class RoomMessageListTestCase(RestTestCase):

View File

@@ -33,6 +33,8 @@ class DnsTestCase(unittest.TestCase):
service_name = "test_service.example.com"
host_name = "example.com"
ip_address = "127.0.0.1"
ip6_address = "::1"
answer_srv = dns.RRHeader(
type=dns.SRV,
@@ -41,9 +43,29 @@ class DnsTestCase(unittest.TestCase):
)
)
answer_a = dns.RRHeader(
type=dns.A,
payload=dns.Record_A(
address=ip_address,
)
)
answer_aaaa = dns.RRHeader(
type=dns.AAAA,
payload=dns.Record_AAAA(
address=ip6_address,
)
)
dns_client_mock.lookupService.return_value = defer.succeed(
([answer_srv], None, None),
)
dns_client_mock.lookupAddress.return_value = defer.succeed(
([answer_a], None, None),
)
dns_client_mock.lookupIPV6Address.return_value = defer.succeed(
([answer_aaaa], None, None),
)
cache = {}
@@ -52,10 +74,13 @@ class DnsTestCase(unittest.TestCase):
)
dns_client_mock.lookupService.assert_called_once_with(service_name)
dns_client_mock.lookupAddress.assert_called_once_with(host_name)
dns_client_mock.lookupIPV6Address.assert_called_once_with(host_name)
self.assertEquals(len(servers), 1)
self.assertEquals(len(servers), 2)
self.assertEquals(servers, cache[service_name])
self.assertEquals(servers[0].host, host_name)
self.assertEquals(servers[0].host, ip_address)
self.assertEquals(servers[1].host, ip6_address)
@defer.inlineCallbacks
def test_from_cache_expired_and_dns_fail(self):

View File

@@ -212,7 +212,7 @@ class MockHttpResource(HttpServer):
headers = {}
if federation_auth:
headers[b"Authorization"] = ["X-Matrix origin=test,key=,sig="]
headers["Authorization"] = ["X-Matrix origin=test,key=,sig="]
mock_request.requestHeaders.getRawHeaders = mock_getRawHeaders(headers)
# return the right path if the event requires it