Compare commits
117 Commits
erikj/urle
...
erik-hacke
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f56db48f67 | ||
|
|
d3347ad485 | ||
|
|
dd7f1b10ce | ||
|
|
d5c74b9f6c | ||
|
|
0f13f30fca | ||
|
|
415aeefd89 | ||
|
|
19ceb4851f | ||
|
|
261124396e | ||
|
|
23a7f9d7f4 | ||
|
|
d7bf3a68f0 | ||
|
|
f67e906e18 | ||
|
|
971059a733 | ||
|
|
e939f3bca6 | ||
|
|
4dae4a97ed | ||
|
|
92e34615c5 | ||
|
|
ab825aa328 | ||
|
|
233699c42e | ||
|
|
121591568b | ||
|
|
b3384232a0 | ||
|
|
360d899a64 | ||
|
|
d54cfbb7a8 | ||
|
|
eaa2ebf20b | ||
|
|
9daf82278f | ||
|
|
661a4d4603 | ||
|
|
dd723267b2 | ||
|
|
a060dfa132 | ||
|
|
f8e8ec013b | ||
|
|
a401750ef4 | ||
|
|
1246d23710 | ||
|
|
d49cbf712f | ||
|
|
ce72d590ed | ||
|
|
6d7f0f8dd3 | ||
|
|
f4284d943a | ||
|
|
d1e56cfcd1 | ||
|
|
89de934981 | ||
|
|
9fbe70a7dc | ||
|
|
a3599dda97 | ||
|
|
87478c5a60 | ||
|
|
c508b2f2f0 | ||
|
|
37354b55c9 | ||
|
|
0e9aa1d091 | ||
|
|
8eaa141d8f | ||
|
|
664adb4236 | ||
|
|
aea3a93611 | ||
|
|
41e0611895 | ||
|
|
61b439c904 | ||
|
|
87770300d5 | ||
|
|
9a311adfea | ||
|
|
64bc2162ef | ||
|
|
d2c6f4d626 | ||
|
|
5232d3bfb1 | ||
|
|
5e785d4d5b | ||
|
|
72c673407f | ||
|
|
6e025a97b4 | ||
|
|
414b2b3bd1 | ||
|
|
b151eb14a2 | ||
|
|
64cebbc730 | ||
|
|
d9ae2bc826 | ||
|
|
21d5a2a08e | ||
|
|
c115deed12 | ||
|
|
072fb59446 | ||
|
|
89dda61315 | ||
|
|
687f3451bd | ||
|
|
f3ef60662f | ||
|
|
e5082494eb | ||
|
|
56b0589865 | ||
|
|
11974f3787 | ||
|
|
145d14656b | ||
|
|
e54c202b81 | ||
|
|
b0500d3774 | ||
|
|
4f40d058cc | ||
|
|
488a2696b6 | ||
|
|
052df50e49 | ||
|
|
c7ede92d0b | ||
|
|
3633ee8c63 | ||
|
|
912c456d60 | ||
|
|
d05575a574 | ||
|
|
2fb6d46042 | ||
|
|
6168351877 | ||
|
|
72251d1b97 | ||
|
|
a9a74101a4 | ||
|
|
6152e253d8 | ||
|
|
c7fa6672d9 | ||
|
|
36c43eef17 | ||
|
|
411c437fc6 | ||
|
|
6783bc0ded | ||
|
|
91ea0202e6 | ||
|
|
295973e3a9 | ||
|
|
3b78452c3a | ||
|
|
af7ed8e1ef | ||
|
|
1f732e05d0 | ||
|
|
9ed48cecbc | ||
|
|
cc9d4a4d3e | ||
|
|
889803a78e | ||
|
|
22245fb8a7 | ||
|
|
78c5eca141 | ||
|
|
55bef59cc7 | ||
|
|
ad683cd203 | ||
|
|
d66afef01e | ||
|
|
b07a33f024 | ||
|
|
74539aa25b | ||
|
|
8f25cd6627 | ||
|
|
dc1299a4b0 | ||
|
|
2c72d66cda | ||
|
|
cde90a89ed | ||
|
|
5aec53ad95 | ||
|
|
d10d19f0ad | ||
|
|
daec1d77be | ||
|
|
61885f7849 | ||
|
|
ba30d489d9 | ||
|
|
5e2d0650df | ||
|
|
841bcbcafa | ||
|
|
08a6b88e3d | ||
|
|
aece8e73b1 | ||
|
|
328bd35e00 | ||
|
|
7de9a28b8e | ||
|
|
2a9c3aea89 |
55
CHANGES.rst
55
CHANGES.rst
@@ -1,3 +1,58 @@
|
||||
Changes in synapse v0.27.3-rc2 (2018-04-09)
|
||||
==========================================
|
||||
v0.27.3-rc1 used a stale version of the develop branch so the changelog overstates
|
||||
the functionality. v0.27.3-rc2 is up to date, rc1 should be ignored.
|
||||
|
||||
Changes in synapse v0.27.3-rc1 (2018-04-09)
|
||||
=======================================
|
||||
|
||||
Notable changes include API support for joinability of groups. Also new metrics
|
||||
and phone home stats. Phone home stats include better visibility of system usage
|
||||
so we can tweak synpase to work better for all users rather than our own experience
|
||||
with matrix.org. Also, recording 'r30' stat which is the measure we use to track
|
||||
overal growth of the Matrix ecosystem. It is defined as:-
|
||||
|
||||
Counts the number of native 30 day retained users, defined as:-
|
||||
* Users who have created their accounts more than 30 days
|
||||
* Where last seen at most 30 days ago
|
||||
* Where account creation and last_seen are > 30 days"
|
||||
|
||||
|
||||
Features:
|
||||
|
||||
* Add joinability for groups (PR #3045)
|
||||
* Implement group join API (PR #3046)
|
||||
* Add counter metrics for calculating state delta (PR #3033)
|
||||
* R30 stats (PR #3041)
|
||||
* Measure time it takes to calculate state group ID (PR #3043)
|
||||
* Add basic performance statistics to phone home (PR #3044)
|
||||
* Add response size metrics (PR #3071)
|
||||
* phone home cache size configurations (PR #3063)
|
||||
|
||||
Changes:
|
||||
|
||||
* Add a blurb explaining the main synapse worker (PR #2886) Thanks to @turt2live!
|
||||
* Replace old style error catching with 'as' keyword (PR #3000) Thanks to @NotAFile!
|
||||
* Use .iter* to avoid copies in StateHandler (PR #3006)
|
||||
* Linearize calls to _generate_user_id (PR #3029)
|
||||
* Remove last usage of ujson (PR #3030)
|
||||
* Use simplejson throughout (PR #3048)
|
||||
* Use static JSONEncoders (PR #3049)
|
||||
* Remove uses of events.content (PR #3060)
|
||||
* Improve database cache performance (PR #3068)
|
||||
|
||||
Bug fixes:
|
||||
|
||||
* Add room_id to the response of `rooms/{roomId}/join` (PR #2986) Thanks to @jplatte!
|
||||
* Fix replication after switch to simplejson (PR #3015)
|
||||
* Fix replication after switch to simplejson (PR #3015)
|
||||
* 404 correctly on missing paths via NoResource (PR #3022)
|
||||
* Fix error when claiming e2e keys from offline servers (PR #3034)
|
||||
* fix tests/storage/test_user_directory.py (PR #3042)
|
||||
* use PUT instead of POST for federating groups/m.join_policy (PR #3070) Thanks to @krombel!
|
||||
* postgres port script: fix state_groups_pkey error (PR #3072)
|
||||
|
||||
|
||||
Changes in synapse v0.27.2 (2018-03-26)
|
||||
=======================================
|
||||
|
||||
|
||||
@@ -22,6 +22,8 @@ import argparse
|
||||
from synapse.events import FrozenEvent
|
||||
from synapse.util.frozenutils import unfreeze
|
||||
|
||||
from six import string_types
|
||||
|
||||
|
||||
def make_graph(file_name, room_id, file_prefix, limit):
|
||||
print "Reading lines"
|
||||
@@ -58,7 +60,7 @@ def make_graph(file_name, room_id, file_prefix, limit):
|
||||
for key, value in unfreeze(event.get_dict()["content"]).items():
|
||||
if value is None:
|
||||
value = "<null>"
|
||||
elif isinstance(value, basestring):
|
||||
elif isinstance(value, string_types):
|
||||
pass
|
||||
else:
|
||||
value = json.dumps(value)
|
||||
|
||||
@@ -202,11 +202,11 @@ new PromConsole.Graph({
|
||||
<h1>Requests</h1>
|
||||
|
||||
<h3>Requests by Servlet</h3>
|
||||
<div id="synapse_http_server_requests_servlet"></div>
|
||||
<div id="synapse_http_server_request_count_servlet"></div>
|
||||
<script>
|
||||
new PromConsole.Graph({
|
||||
node: document.querySelector("#synapse_http_server_requests_servlet"),
|
||||
expr: "rate(synapse_http_server_requests:servlet[2m])",
|
||||
node: document.querySelector("#synapse_http_server_request_count_servlet"),
|
||||
expr: "rate(synapse_http_server_request_count:servlet[2m])",
|
||||
name: "[[servlet]]",
|
||||
yAxisFormatter: PromConsole.NumberFormatter.humanize,
|
||||
yHoverFormatter: PromConsole.NumberFormatter.humanize,
|
||||
@@ -215,11 +215,11 @@ new PromConsole.Graph({
|
||||
})
|
||||
</script>
|
||||
<h4> (without <tt>EventStreamRestServlet</tt> or <tt>SyncRestServlet</tt>)</h4>
|
||||
<div id="synapse_http_server_requests_servlet_minus_events"></div>
|
||||
<div id="synapse_http_server_request_count_servlet_minus_events"></div>
|
||||
<script>
|
||||
new PromConsole.Graph({
|
||||
node: document.querySelector("#synapse_http_server_requests_servlet_minus_events"),
|
||||
expr: "rate(synapse_http_server_requests:servlet{servlet!=\"EventStreamRestServlet\", servlet!=\"SyncRestServlet\"}[2m])",
|
||||
node: document.querySelector("#synapse_http_server_request_count_servlet_minus_events"),
|
||||
expr: "rate(synapse_http_server_request_count:servlet{servlet!=\"EventStreamRestServlet\", servlet!=\"SyncRestServlet\"}[2m])",
|
||||
name: "[[servlet]]",
|
||||
yAxisFormatter: PromConsole.NumberFormatter.humanize,
|
||||
yHoverFormatter: PromConsole.NumberFormatter.humanize,
|
||||
@@ -233,7 +233,7 @@ new PromConsole.Graph({
|
||||
<script>
|
||||
new PromConsole.Graph({
|
||||
node: document.querySelector("#synapse_http_server_response_time_avg"),
|
||||
expr: "rate(synapse_http_server_response_time:total[2m]) / rate(synapse_http_server_response_time:count[2m]) / 1000",
|
||||
expr: "rate(synapse_http_server_response_time_seconds[2m]) / rate(synapse_http_server_response_count[2m]) / 1000",
|
||||
name: "[[servlet]]",
|
||||
yAxisFormatter: PromConsole.NumberFormatter.humanize,
|
||||
yHoverFormatter: PromConsole.NumberFormatter.humanize,
|
||||
@@ -276,7 +276,7 @@ new PromConsole.Graph({
|
||||
<script>
|
||||
new PromConsole.Graph({
|
||||
node: document.querySelector("#synapse_http_server_response_ru_utime"),
|
||||
expr: "rate(synapse_http_server_response_ru_utime:total[2m])",
|
||||
expr: "rate(synapse_http_server_response_ru_utime_seconds[2m])",
|
||||
name: "[[servlet]]",
|
||||
yAxisFormatter: PromConsole.NumberFormatter.humanize,
|
||||
yHoverFormatter: PromConsole.NumberFormatter.humanize,
|
||||
@@ -291,7 +291,7 @@ new PromConsole.Graph({
|
||||
<script>
|
||||
new PromConsole.Graph({
|
||||
node: document.querySelector("#synapse_http_server_response_db_txn_duration"),
|
||||
expr: "rate(synapse_http_server_response_db_txn_duration:total[2m])",
|
||||
expr: "rate(synapse_http_server_response_db_txn_duration_seconds[2m])",
|
||||
name: "[[servlet]]",
|
||||
yAxisFormatter: PromConsole.NumberFormatter.humanize,
|
||||
yHoverFormatter: PromConsole.NumberFormatter.humanize,
|
||||
@@ -306,7 +306,7 @@ new PromConsole.Graph({
|
||||
<script>
|
||||
new PromConsole.Graph({
|
||||
node: document.querySelector("#synapse_http_server_send_time_avg"),
|
||||
expr: "rate(synapse_http_server_response_time:total{servlet='RoomSendEventRestServlet'}[2m]) / rate(synapse_http_server_response_time:count{servlet='RoomSendEventRestServlet'}[2m]) / 1000",
|
||||
expr: "rate(synapse_http_server_response_time_second{servlet='RoomSendEventRestServlet'}[2m]) / rate(synapse_http_server_response_count{servlet='RoomSendEventRestServlet'}[2m]) / 1000",
|
||||
name: "[[servlet]]",
|
||||
yAxisFormatter: PromConsole.NumberFormatter.humanize,
|
||||
yHoverFormatter: PromConsole.NumberFormatter.humanize,
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
synapse_federation_transaction_queue_pendingEdus:total = sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)
|
||||
synapse_federation_transaction_queue_pendingPdus:total = sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)
|
||||
|
||||
synapse_http_server_requests:method{servlet=""} = sum(synapse_http_server_requests) by (method)
|
||||
synapse_http_server_requests:servlet{method=""} = sum(synapse_http_server_requests) by (servlet)
|
||||
synapse_http_server_request_count:method{servlet=""} = sum(synapse_http_server_request_count) by (method)
|
||||
synapse_http_server_request_count:servlet{method=""} = sum(synapse_http_server_request_count) by (servlet)
|
||||
|
||||
synapse_http_server_requests:total{servlet=""} = sum(synapse_http_server_requests:by_method) by (servlet)
|
||||
synapse_http_server_request_count:total{servlet=""} = sum(synapse_http_server_request_count:by_method) by (servlet)
|
||||
|
||||
synapse_cache:hit_ratio_5m = rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])
|
||||
synapse_cache:hit_ratio_30s = rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s])
|
||||
|
||||
@@ -5,19 +5,19 @@ groups:
|
||||
expr: "sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)"
|
||||
- record: "synapse_federation_transaction_queue_pendingPdus:total"
|
||||
expr: "sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)"
|
||||
- record: 'synapse_http_server_requests:method'
|
||||
- record: 'synapse_http_server_request_count:method'
|
||||
labels:
|
||||
servlet: ""
|
||||
expr: "sum(synapse_http_server_requests) by (method)"
|
||||
- record: 'synapse_http_server_requests:servlet'
|
||||
expr: "sum(synapse_http_server_request_count) by (method)"
|
||||
- record: 'synapse_http_server_request_count:servlet'
|
||||
labels:
|
||||
method: ""
|
||||
expr: 'sum(synapse_http_server_requests) by (servlet)'
|
||||
expr: 'sum(synapse_http_server_request_count) by (servlet)'
|
||||
|
||||
- record: 'synapse_http_server_requests:total'
|
||||
- record: 'synapse_http_server_request_count:total'
|
||||
labels:
|
||||
servlet: ""
|
||||
expr: 'sum(synapse_http_server_requests:by_method) by (servlet)'
|
||||
expr: 'sum(synapse_http_server_request_count:by_method) by (servlet)'
|
||||
|
||||
- record: 'synapse_cache:hit_ratio_5m'
|
||||
expr: 'rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])'
|
||||
|
||||
@@ -30,6 +30,8 @@ import time
|
||||
import traceback
|
||||
import yaml
|
||||
|
||||
from six import string_types
|
||||
|
||||
|
||||
logger = logging.getLogger("synapse_port_db")
|
||||
|
||||
@@ -574,7 +576,7 @@ class Porter(object):
|
||||
def conv(j, col):
|
||||
if j in bool_cols:
|
||||
return bool(col)
|
||||
elif isinstance(col, basestring) and "\0" in col:
|
||||
elif isinstance(col, string_types) and "\0" in col:
|
||||
logger.warn("DROPPING ROW: NUL value in table %s col %s: %r", table, headers[j], col)
|
||||
raise BadValueException();
|
||||
return col
|
||||
|
||||
@@ -16,4 +16,4 @@
|
||||
""" This is a reference implementation of a Matrix home server.
|
||||
"""
|
||||
|
||||
__version__ = "0.27.2"
|
||||
__version__ = "0.27.3-rc2"
|
||||
|
||||
@@ -204,8 +204,8 @@ class Auth(object):
|
||||
|
||||
ip_addr = self.hs.get_ip_from_request(request)
|
||||
user_agent = request.requestHeaders.getRawHeaders(
|
||||
"User-Agent",
|
||||
default=[""]
|
||||
b"User-Agent",
|
||||
default=[b""]
|
||||
)[0]
|
||||
if user and access_token and ip_addr:
|
||||
self.store.insert_client_ip(
|
||||
@@ -672,7 +672,7 @@ def has_access_token(request):
|
||||
bool: False if no access_token was given, True otherwise.
|
||||
"""
|
||||
query_params = request.args.get("access_token")
|
||||
auth_headers = request.requestHeaders.getRawHeaders("Authorization")
|
||||
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
|
||||
return bool(query_params) or bool(auth_headers)
|
||||
|
||||
|
||||
@@ -692,8 +692,8 @@ def get_access_token_from_request(request, token_not_found_http_status=401):
|
||||
AuthError: If there isn't an access_token in the request.
|
||||
"""
|
||||
|
||||
auth_headers = request.requestHeaders.getRawHeaders("Authorization")
|
||||
query_params = request.args.get("access_token")
|
||||
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
|
||||
query_params = request.args.get(b"access_token")
|
||||
if auth_headers:
|
||||
# Try the get the access_token from a "Authorization: Bearer"
|
||||
# header
|
||||
|
||||
@@ -36,6 +36,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
||||
from synapse.replication.slave.storage.devices import SlavedDeviceStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
|
||||
from synapse.rest.client.v2_alpha._base import client_v2_patterns
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
@@ -49,6 +50,35 @@ from twisted.web.resource import NoResource
|
||||
logger = logging.getLogger("synapse.app.frontend_proxy")
|
||||
|
||||
|
||||
class PresenceStatusStubServlet(ClientV1RestServlet):
|
||||
PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
|
||||
|
||||
def __init__(self, hs):
|
||||
super(PresenceStatusStubServlet, self).__init__(hs)
|
||||
self.http_client = hs.get_simple_http_client()
|
||||
self.auth = hs.get_auth()
|
||||
self.main_uri = hs.config.worker_main_http_uri
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_GET(self, request, user_id):
|
||||
# Pass through the auth headers, if any, in case the access token
|
||||
# is there.
|
||||
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
|
||||
headers = {
|
||||
"Authorization": auth_headers,
|
||||
}
|
||||
result = yield self.http_client.get_json(
|
||||
self.main_uri + request.uri,
|
||||
headers=headers,
|
||||
)
|
||||
defer.returnValue((200, result))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_PUT(self, request, user_id):
|
||||
yield self.auth.get_user_by_req(request)
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
||||
class KeyUploadServlet(RestServlet):
|
||||
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
|
||||
|
||||
@@ -90,7 +120,7 @@ class KeyUploadServlet(RestServlet):
|
||||
# They're actually trying to upload something, proxy to main synapse.
|
||||
# Pass through the auth headers, if any, in case the access token
|
||||
# is there.
|
||||
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
|
||||
auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", [])
|
||||
headers = {
|
||||
"Authorization": auth_headers,
|
||||
}
|
||||
@@ -135,6 +165,7 @@ class FrontendProxyServer(HomeServer):
|
||||
elif name == "client":
|
||||
resource = JsonResource(self, canonical_json=False)
|
||||
KeyUploadServlet(self).register(resource)
|
||||
PresenceStatusStubServlet(self).register(resource)
|
||||
resources.update({
|
||||
"/_matrix/client/r0": resource,
|
||||
"/_matrix/client/unstable": resource,
|
||||
|
||||
@@ -113,6 +113,7 @@ class SynchrotronPresence(object):
|
||||
logger.info("Presence process_id is %r", self.process_id)
|
||||
|
||||
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
|
||||
return
|
||||
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
|
||||
|
||||
def mark_as_coming_online(self, user_id):
|
||||
@@ -210,6 +211,8 @@ class SynchrotronPresence(object):
|
||||
yield self.notify_from_replication(states, stream_id)
|
||||
|
||||
def get_currently_syncing_users(self):
|
||||
# presence is disabled on matrix.org, so we return the empty set
|
||||
return set()
|
||||
return [
|
||||
user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
|
||||
if count > 0
|
||||
|
||||
@@ -108,7 +108,7 @@ def stop(pidfile, app):
|
||||
|
||||
|
||||
Worker = collections.namedtuple("Worker", [
|
||||
"app", "configfile", "pidfile", "cache_factor"
|
||||
"app", "configfile", "pidfile", "cache_factor", "cache_factors",
|
||||
])
|
||||
|
||||
|
||||
@@ -171,6 +171,10 @@ def main():
|
||||
if cache_factor:
|
||||
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
|
||||
|
||||
cache_factors = config.get("synctl_cache_factors", {})
|
||||
for cache_name, factor in cache_factors.iteritems():
|
||||
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
|
||||
|
||||
worker_configfiles = []
|
||||
if options.worker:
|
||||
start_stop_synapse = False
|
||||
@@ -211,6 +215,10 @@ def main():
|
||||
or pidfile
|
||||
)
|
||||
worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
|
||||
worker_cache_factors = (
|
||||
worker_config.get("synctl_cache_factors")
|
||||
or cache_factors
|
||||
)
|
||||
daemonize = worker_config.get("daemonize") or config.get("daemonize")
|
||||
assert daemonize, "Main process must have daemonize set to true"
|
||||
|
||||
@@ -226,8 +234,10 @@ def main():
|
||||
assert worker_daemonize, "In config %r: expected '%s' to be True" % (
|
||||
worker_configfile, "worker_daemonize")
|
||||
worker_cache_factor = worker_config.get("synctl_cache_factor")
|
||||
worker_cache_factors = worker_config.get("synctl_cache_factors", {})
|
||||
workers.append(Worker(
|
||||
worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
|
||||
worker_cache_factors,
|
||||
))
|
||||
|
||||
action = options.action
|
||||
@@ -262,15 +272,19 @@ def main():
|
||||
start(configfile)
|
||||
|
||||
for worker in workers:
|
||||
env = os.environ.copy()
|
||||
|
||||
if worker.cache_factor:
|
||||
os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
|
||||
|
||||
for cache_name, factor in worker.cache_factors.iteritems():
|
||||
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
|
||||
|
||||
start_worker(worker.app, configfile, worker.configfile)
|
||||
|
||||
if cache_factor:
|
||||
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
|
||||
else:
|
||||
os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
|
||||
# Reset env back to the original
|
||||
os.environ.clear()
|
||||
os.environ.update(env)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -21,6 +21,8 @@ from twisted.internet import defer
|
||||
import logging
|
||||
import re
|
||||
|
||||
from six import string_types
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -146,7 +148,7 @@ class ApplicationService(object):
|
||||
)
|
||||
|
||||
regex = regex_obj.get("regex")
|
||||
if isinstance(regex, basestring):
|
||||
if isinstance(regex, string_types):
|
||||
regex_obj["regex"] = re.compile(regex) # Pre-compile regex
|
||||
else:
|
||||
raise ValueError(
|
||||
|
||||
@@ -73,7 +73,8 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||
super(ApplicationServiceApi, self).__init__(hs)
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
self.protocol_meta_cache = ResponseCache(hs, timeout_ms=HOUR_IN_MS)
|
||||
self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta",
|
||||
timeout_ms=HOUR_IN_MS)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def query_user(self, service, user_id):
|
||||
|
||||
@@ -19,6 +19,8 @@ import os
|
||||
import yaml
|
||||
from textwrap import dedent
|
||||
|
||||
from six import integer_types
|
||||
|
||||
|
||||
class ConfigError(Exception):
|
||||
pass
|
||||
@@ -49,7 +51,7 @@ Missing mandatory `server_name` config option.
|
||||
class Config(object):
|
||||
@staticmethod
|
||||
def parse_size(value):
|
||||
if isinstance(value, int) or isinstance(value, long):
|
||||
if isinstance(value, integer_types):
|
||||
return value
|
||||
sizes = {"K": 1024, "M": 1024 * 1024}
|
||||
size = 1
|
||||
@@ -61,7 +63,7 @@ class Config(object):
|
||||
|
||||
@staticmethod
|
||||
def parse_duration(value):
|
||||
if isinstance(value, int) or isinstance(value, long):
|
||||
if isinstance(value, integer_types):
|
||||
return value
|
||||
second = 1000
|
||||
minute = 60 * second
|
||||
@@ -288,22 +290,22 @@ class Config(object):
|
||||
)
|
||||
obj.invoke_all("generate_files", config)
|
||||
config_file.write(config_bytes)
|
||||
print (
|
||||
print((
|
||||
"A config file has been generated in %r for server name"
|
||||
" %r with corresponding SSL keys and self-signed"
|
||||
" certificates. Please review this file and customise it"
|
||||
" to your needs."
|
||||
) % (config_path, server_name)
|
||||
print (
|
||||
) % (config_path, server_name))
|
||||
print(
|
||||
"If this server name is incorrect, you will need to"
|
||||
" regenerate the SSL certificates"
|
||||
)
|
||||
return
|
||||
else:
|
||||
print (
|
||||
print((
|
||||
"Config file %r already exists. Generating any missing key"
|
||||
" files."
|
||||
) % (config_path,)
|
||||
) % (config_path,))
|
||||
generate_keys = True
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
|
||||
@@ -21,6 +21,8 @@ import urllib
|
||||
import yaml
|
||||
import logging
|
||||
|
||||
from six import string_types
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -89,14 +91,14 @@ def _load_appservice(hostname, as_info, config_filename):
|
||||
"id", "as_token", "hs_token", "sender_localpart"
|
||||
]
|
||||
for field in required_string_fields:
|
||||
if not isinstance(as_info.get(field), basestring):
|
||||
if not isinstance(as_info.get(field), string_types):
|
||||
raise KeyError("Required string field: '%s' (%s)" % (
|
||||
field, config_filename,
|
||||
))
|
||||
|
||||
# 'url' must either be a string or explicitly null, not missing
|
||||
# to avoid accidentally turning off push for ASes.
|
||||
if (not isinstance(as_info.get("url"), basestring) and
|
||||
if (not isinstance(as_info.get("url"), string_types) and
|
||||
as_info.get("url", "") is not None):
|
||||
raise KeyError(
|
||||
"Required string field or explicit null: 'url' (%s)" % (config_filename,)
|
||||
@@ -128,7 +130,7 @@ def _load_appservice(hostname, as_info, config_filename):
|
||||
"Expected namespace entry in %s to be an object,"
|
||||
" but got %s", ns, regex_obj
|
||||
)
|
||||
if not isinstance(regex_obj.get("regex"), basestring):
|
||||
if not isinstance(regex_obj.get("regex"), string_types):
|
||||
raise ValueError(
|
||||
"Missing/bad type 'regex' key in %s", regex_obj
|
||||
)
|
||||
|
||||
@@ -65,7 +65,7 @@ class FederationServer(FederationBase):
|
||||
|
||||
# We cache responses to state queries, as they take a while and often
|
||||
# come in waves.
|
||||
self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
|
||||
self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
|
||||
@@ -169,7 +169,7 @@ class TransactionQueue(object):
|
||||
while True:
|
||||
last_token = yield self.store.get_federation_out_pos("events")
|
||||
next_token, events = yield self.store.get_all_new_events_stream(
|
||||
last_token, self._last_poked_id, limit=20,
|
||||
last_token, self._last_poked_id, limit=100,
|
||||
)
|
||||
|
||||
logger.debug("Handling %s -> %s", last_token, next_token)
|
||||
@@ -177,24 +177,33 @@ class TransactionQueue(object):
|
||||
if not events and next_token >= self._last_poked_id:
|
||||
break
|
||||
|
||||
for event in events:
|
||||
@defer.inlineCallbacks
|
||||
def handle_event(event):
|
||||
# Only send events for this server.
|
||||
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
|
||||
is_mine = self.is_mine_id(event.event_id)
|
||||
if not is_mine and send_on_behalf_of is None:
|
||||
continue
|
||||
return
|
||||
|
||||
try:
|
||||
# Get the state from before the event.
|
||||
# We need to make sure that this is the state from before
|
||||
# the event and not from after it.
|
||||
# Otherwise if the last member on a server in a room is
|
||||
# banned then it won't receive the event because it won't
|
||||
# be in the room after the ban.
|
||||
destinations = yield self.state.get_current_hosts_in_room(
|
||||
event.room_id, latest_event_ids=[
|
||||
prev_id for prev_id, _ in event.prev_events
|
||||
],
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed to calculate hosts in room for event: %s",
|
||||
event.event_id,
|
||||
)
|
||||
return
|
||||
|
||||
# Get the state from before the event.
|
||||
# We need to make sure that this is the state from before
|
||||
# the event and not from after it.
|
||||
# Otherwise if the last member on a server in a room is
|
||||
# banned then it won't receive the event because it won't
|
||||
# be in the room after the ban.
|
||||
destinations = yield self.state.get_current_hosts_in_room(
|
||||
event.room_id, latest_event_ids=[
|
||||
prev_id for prev_id, _ in event.prev_events
|
||||
],
|
||||
)
|
||||
destinations = set(destinations)
|
||||
|
||||
if send_on_behalf_of is not None:
|
||||
@@ -207,12 +216,44 @@ class TransactionQueue(object):
|
||||
|
||||
self._send_pdu(event, destinations)
|
||||
|
||||
events_processed_counter.inc_by(len(events))
|
||||
@defer.inlineCallbacks
|
||||
def handle_room_events(events):
|
||||
for event in events:
|
||||
yield handle_event(event)
|
||||
|
||||
events_by_room = {}
|
||||
for event in events:
|
||||
events_by_room.setdefault(event.room_id, []).append(event)
|
||||
|
||||
yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||
[
|
||||
logcontext.run_in_background(handle_room_events, evs)
|
||||
for evs in events_by_room.itervalues()
|
||||
],
|
||||
consumeErrors=True
|
||||
))
|
||||
|
||||
yield self.store.update_federation_out_pos(
|
||||
"events", next_token
|
||||
)
|
||||
|
||||
if events:
|
||||
now = self.clock.time_msec()
|
||||
ts = yield self.store.get_received_ts(events[-1].event_id)
|
||||
|
||||
synapse.metrics.event_processing_lag.set(
|
||||
now - ts, "federation_sender",
|
||||
)
|
||||
synapse.metrics.event_processing_last_ts.set(
|
||||
ts, "federation_sender",
|
||||
)
|
||||
|
||||
events_processed_counter.inc_by(len(events))
|
||||
|
||||
synapse.metrics.event_processing_positions.set(
|
||||
next_token, "federation_sender",
|
||||
)
|
||||
|
||||
finally:
|
||||
self._is_processing = False
|
||||
|
||||
@@ -254,6 +295,7 @@ class TransactionQueue(object):
|
||||
Args:
|
||||
states (list(UserPresenceState))
|
||||
"""
|
||||
return
|
||||
|
||||
# First we queue up the new presence by user ID, so multiple presence
|
||||
# updates in quick successtion are correctly handled
|
||||
|
||||
@@ -18,7 +18,9 @@ from twisted.internet import defer
|
||||
import synapse
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
||||
from synapse.util.logcontext import (
|
||||
make_deferred_yieldable, preserve_fn, run_in_background,
|
||||
)
|
||||
|
||||
import logging
|
||||
|
||||
@@ -84,11 +86,16 @@ class ApplicationServicesHandler(object):
|
||||
if not events:
|
||||
break
|
||||
|
||||
events_by_room = {}
|
||||
for event in events:
|
||||
events_by_room.setdefault(event.room_id, []).append(event)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def handle_event(event):
|
||||
# Gather interested services
|
||||
services = yield self._get_services_for_event(event)
|
||||
if len(services) == 0:
|
||||
continue # no services need notifying
|
||||
return # no services need notifying
|
||||
|
||||
# Do we know this user exists? If not, poke the user
|
||||
# query API for all services which match that user regex.
|
||||
@@ -108,9 +115,33 @@ class ApplicationServicesHandler(object):
|
||||
service, event
|
||||
)
|
||||
|
||||
events_processed_counter.inc_by(len(events))
|
||||
@defer.inlineCallbacks
|
||||
def handle_room_events(events):
|
||||
for event in events:
|
||||
yield handle_event(event)
|
||||
|
||||
yield make_deferred_yieldable(defer.gatherResults([
|
||||
run_in_background(handle_room_events, evs)
|
||||
for evs in events_by_room.itervalues()
|
||||
], consumeErrors=True))
|
||||
|
||||
yield self.store.set_appservice_last_pos(upper_bound)
|
||||
|
||||
now = self.clock.time_msec()
|
||||
ts = yield self.store.get_received_ts(events[-1].event_id)
|
||||
|
||||
synapse.metrics.event_processing_positions.set(
|
||||
upper_bound, "appservice_sender",
|
||||
)
|
||||
|
||||
events_processed_counter.inc_by(len(events))
|
||||
|
||||
synapse.metrics.event_processing_lag.set(
|
||||
now - ts, "appservice_sender",
|
||||
)
|
||||
synapse.metrics.event_processing_last_ts.set(
|
||||
ts, "appservice_sender",
|
||||
)
|
||||
finally:
|
||||
self.is_processing = False
|
||||
|
||||
|
||||
@@ -372,6 +372,7 @@ class InitialSyncHandler(BaseHandler):
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_presence():
|
||||
defer.returnValue([])
|
||||
states = yield presence_handler.get_states(
|
||||
[m.user_id for m in room_members],
|
||||
as_event=True,
|
||||
|
||||
@@ -384,7 +384,7 @@ class MessageHandler(BaseHandler):
|
||||
# If this is an AS, double check that they are allowed to see the members.
|
||||
# This can either be because the AS user is in the room or becuase there
|
||||
# is a user in the room that the AS is "interested in"
|
||||
if requester.app_service and user_id not in users_with_profile:
|
||||
if False and requester.app_service and user_id not in users_with_profile:
|
||||
for uid in users_with_profile:
|
||||
if requester.app_service.is_interested_in_user(uid):
|
||||
break
|
||||
@@ -454,40 +454,39 @@ class EventCreationHandler(object):
|
||||
"""
|
||||
builder = self.event_builder_factory.new(event_dict)
|
||||
|
||||
with (yield self.limiter.queue(builder.room_id)):
|
||||
self.validator.validate_new(builder)
|
||||
self.validator.validate_new(builder)
|
||||
|
||||
if builder.type == EventTypes.Member:
|
||||
membership = builder.content.get("membership", None)
|
||||
target = UserID.from_string(builder.state_key)
|
||||
if builder.type == EventTypes.Member:
|
||||
membership = builder.content.get("membership", None)
|
||||
target = UserID.from_string(builder.state_key)
|
||||
|
||||
if membership in {Membership.JOIN, Membership.INVITE}:
|
||||
# If event doesn't include a display name, add one.
|
||||
profile = self.profile_handler
|
||||
content = builder.content
|
||||
if membership in {Membership.JOIN, Membership.INVITE}:
|
||||
# If event doesn't include a display name, add one.
|
||||
profile = self.profile_handler
|
||||
content = builder.content
|
||||
|
||||
try:
|
||||
if "displayname" not in content:
|
||||
content["displayname"] = yield profile.get_displayname(target)
|
||||
if "avatar_url" not in content:
|
||||
content["avatar_url"] = yield profile.get_avatar_url(target)
|
||||
except Exception as e:
|
||||
logger.info(
|
||||
"Failed to get profile information for %r: %s",
|
||||
target, e
|
||||
)
|
||||
try:
|
||||
if "displayname" not in content:
|
||||
content["displayname"] = yield profile.get_displayname(target)
|
||||
if "avatar_url" not in content:
|
||||
content["avatar_url"] = yield profile.get_avatar_url(target)
|
||||
except Exception as e:
|
||||
logger.info(
|
||||
"Failed to get profile information for %r: %s",
|
||||
target, e
|
||||
)
|
||||
|
||||
if token_id is not None:
|
||||
builder.internal_metadata.token_id = token_id
|
||||
if token_id is not None:
|
||||
builder.internal_metadata.token_id = token_id
|
||||
|
||||
if txn_id is not None:
|
||||
builder.internal_metadata.txn_id = txn_id
|
||||
if txn_id is not None:
|
||||
builder.internal_metadata.txn_id = txn_id
|
||||
|
||||
event, context = yield self.create_new_client_event(
|
||||
builder=builder,
|
||||
requester=requester,
|
||||
prev_event_ids=prev_event_ids,
|
||||
)
|
||||
event, context = yield self.create_new_client_event(
|
||||
builder=builder,
|
||||
requester=requester,
|
||||
prev_event_ids=prev_event_ids,
|
||||
)
|
||||
|
||||
defer.returnValue((event, context))
|
||||
|
||||
@@ -557,27 +556,34 @@ class EventCreationHandler(object):
|
||||
|
||||
See self.create_event and self.send_nonmember_event.
|
||||
"""
|
||||
event, context = yield self.create_event(
|
||||
requester,
|
||||
event_dict,
|
||||
token_id=requester.access_token_id,
|
||||
txn_id=txn_id
|
||||
)
|
||||
|
||||
spam_error = self.spam_checker.check_event_for_spam(event)
|
||||
if spam_error:
|
||||
if not isinstance(spam_error, basestring):
|
||||
spam_error = "Spam is not permitted here"
|
||||
raise SynapseError(
|
||||
403, spam_error, Codes.FORBIDDEN
|
||||
# We limit the number of concurrent event sends in a room so that we
|
||||
# don't fork the DAG too much. If we don't limit then we can end up in
|
||||
# a situation where event persistence can't keep up, causing
|
||||
# extremities to pile up, which in turn leads to state resolution
|
||||
# taking longer.
|
||||
with (yield self.limiter.queue(event_dict["room_id"])):
|
||||
event, context = yield self.create_event(
|
||||
requester,
|
||||
event_dict,
|
||||
token_id=requester.access_token_id,
|
||||
txn_id=txn_id
|
||||
)
|
||||
|
||||
yield self.send_nonmember_event(
|
||||
requester,
|
||||
event,
|
||||
context,
|
||||
ratelimit=ratelimit,
|
||||
)
|
||||
spam_error = self.spam_checker.check_event_for_spam(event)
|
||||
if spam_error:
|
||||
if not isinstance(spam_error, basestring):
|
||||
spam_error = "Spam is not permitted here"
|
||||
raise SynapseError(
|
||||
403, spam_error, Codes.FORBIDDEN
|
||||
)
|
||||
|
||||
yield self.send_nonmember_event(
|
||||
requester,
|
||||
event,
|
||||
context,
|
||||
ratelimit=ratelimit,
|
||||
)
|
||||
defer.returnValue(event)
|
||||
|
||||
@measure_func("create_new_client_event")
|
||||
|
||||
@@ -373,6 +373,7 @@ class PresenceHandler(object):
|
||||
"""We've seen the user do something that indicates they're interacting
|
||||
with the app.
|
||||
"""
|
||||
return
|
||||
user_id = user.to_string()
|
||||
|
||||
bump_active_time_counter.inc()
|
||||
@@ -402,6 +403,7 @@ class PresenceHandler(object):
|
||||
Useful for streams that are not associated with an actual
|
||||
client that is being used by a user.
|
||||
"""
|
||||
affect_presence = False
|
||||
if affect_presence:
|
||||
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
|
||||
self.user_to_num_current_syncs[user_id] = curr_sync + 1
|
||||
@@ -444,6 +446,8 @@ class PresenceHandler(object):
|
||||
Returns:
|
||||
set(str): A set of user_id strings.
|
||||
"""
|
||||
# presence is disabled on matrix.org, so we return the empty set
|
||||
return set()
|
||||
syncing_user_ids = {
|
||||
user_id for user_id, count in self.user_to_num_current_syncs.items()
|
||||
if count
|
||||
@@ -463,6 +467,7 @@ class PresenceHandler(object):
|
||||
syncing_user_ids(set(str)): The set of user_ids that are
|
||||
currently syncing on that server.
|
||||
"""
|
||||
return
|
||||
|
||||
# Grab the previous list of user_ids that were syncing on that process
|
||||
prev_syncing_user_ids = (
|
||||
|
||||
@@ -23,7 +23,7 @@ from synapse.api.errors import (
|
||||
)
|
||||
from synapse.http.client import CaptchaServerHttpClient
|
||||
from synapse import types
|
||||
from synapse.types import UserID
|
||||
from synapse.types import UserID, create_requester, RoomID, RoomAlias
|
||||
from synapse.util.async import run_on_reactor, Linearizer
|
||||
from synapse.util.threepids import check_3pid_allowed
|
||||
from ._base import BaseHandler
|
||||
@@ -205,10 +205,17 @@ class RegistrationHandler(BaseHandler):
|
||||
token = None
|
||||
attempts += 1
|
||||
|
||||
# auto-join the user to any rooms we're supposed to dump them into
|
||||
fake_requester = create_requester(user_id)
|
||||
for r in self.hs.config.auto_join_rooms:
|
||||
try:
|
||||
yield self._join_user_to_room(fake_requester, r)
|
||||
except Exception as e:
|
||||
logger.error("Failed to join new user to %r: %r", r, e)
|
||||
|
||||
# We used to generate default identicons here, but nowadays
|
||||
# we want clients to generate their own as part of their branding
|
||||
# rather than there being consistent matrix-wide ones, so we don't.
|
||||
|
||||
defer.returnValue((user_id, token))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@@ -483,3 +490,28 @@ class RegistrationHandler(BaseHandler):
|
||||
)
|
||||
|
||||
defer.returnValue((user_id, access_token))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _join_user_to_room(self, requester, room_identifier):
|
||||
room_id = None
|
||||
room_member_handler = self.hs.get_room_member_handler()
|
||||
if RoomID.is_valid(room_identifier):
|
||||
room_id = room_identifier
|
||||
elif RoomAlias.is_valid(room_identifier):
|
||||
room_alias = RoomAlias.from_string(room_identifier)
|
||||
room_id, remote_room_hosts = (
|
||||
yield room_member_handler.lookup_room_alias(room_alias)
|
||||
)
|
||||
room_id = room_id.to_string()
|
||||
else:
|
||||
raise SynapseError(400, "%s was not legal room ID or room alias" % (
|
||||
room_identifier,
|
||||
))
|
||||
|
||||
yield room_member_handler.update_membership(
|
||||
requester=requester,
|
||||
target=requester.user,
|
||||
room_id=room_id,
|
||||
remote_room_hosts=remote_room_hosts,
|
||||
action="join",
|
||||
)
|
||||
|
||||
@@ -44,8 +44,9 @@ EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
|
||||
class RoomListHandler(BaseHandler):
|
||||
def __init__(self, hs):
|
||||
super(RoomListHandler, self).__init__(hs)
|
||||
self.response_cache = ResponseCache(hs)
|
||||
self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
|
||||
self.response_cache = ResponseCache(hs, "room_list", timeout_ms=10 * 60 * 1000)
|
||||
self.remote_response_cache = ResponseCache(hs, "remote_room_list",
|
||||
timeout_ms=30 * 1000)
|
||||
|
||||
def get_local_public_room_list(self, limit=None, since_token=None,
|
||||
search_filter=None,
|
||||
|
||||
@@ -28,7 +28,7 @@ from synapse.api.constants import (
|
||||
)
|
||||
from synapse.api.errors import AuthError, SynapseError, Codes
|
||||
from synapse.types import UserID, RoomID
|
||||
from synapse.util.async import Linearizer
|
||||
from synapse.util.async import Linearizer, Limiter
|
||||
from synapse.util.distributor import user_left_room, user_joined_room
|
||||
|
||||
|
||||
@@ -60,6 +60,7 @@ class RoomMemberHandler(object):
|
||||
self.event_creation_hander = hs.get_event_creation_handler()
|
||||
|
||||
self.member_linearizer = Linearizer(name="member")
|
||||
self.member_limiter = Limiter(20)
|
||||
|
||||
self.clock = hs.get_clock()
|
||||
self.spam_checker = hs.get_spam_checker()
|
||||
@@ -232,18 +233,23 @@ class RoomMemberHandler(object):
|
||||
):
|
||||
key = (room_id,)
|
||||
|
||||
with (yield self.member_linearizer.queue(key)):
|
||||
result = yield self._update_membership(
|
||||
requester,
|
||||
target,
|
||||
room_id,
|
||||
action,
|
||||
txn_id=txn_id,
|
||||
remote_room_hosts=remote_room_hosts,
|
||||
third_party_signed=third_party_signed,
|
||||
ratelimit=ratelimit,
|
||||
content=content,
|
||||
)
|
||||
as_id = object()
|
||||
if requester.app_service:
|
||||
as_id = requester.app_service.id
|
||||
|
||||
with (yield self.member_limiter.queue(as_id)):
|
||||
with (yield self.member_linearizer.queue(key)):
|
||||
result = yield self._update_membership(
|
||||
requester,
|
||||
target,
|
||||
room_id,
|
||||
action,
|
||||
txn_id=txn_id,
|
||||
remote_room_hosts=remote_room_hosts,
|
||||
third_party_signed=third_party_signed,
|
||||
ratelimit=ratelimit,
|
||||
content=content,
|
||||
)
|
||||
|
||||
defer.returnValue(result)
|
||||
|
||||
@@ -852,6 +858,14 @@ class RoomMemberMasterHandler(RoomMemberHandler):
|
||||
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
|
||||
"""Implements RoomMemberHandler._remote_join
|
||||
"""
|
||||
# filter ourselves out of remote_room_hosts: do_invite_join ignores it
|
||||
# and if it is the only entry we'd like to return a 404 rather than a
|
||||
# 500.
|
||||
|
||||
remote_room_hosts = [
|
||||
host for host in remote_room_hosts if host != self.hs.hostname
|
||||
]
|
||||
|
||||
if len(remote_room_hosts) == 0:
|
||||
raise SynapseError(404, "No known servers")
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ import itertools
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
|
||||
|
||||
SyncConfig = collections.namedtuple("SyncConfig", [
|
||||
"user",
|
||||
@@ -169,7 +170,10 @@ class SyncHandler(object):
|
||||
self.presence_handler = hs.get_presence_handler()
|
||||
self.event_sources = hs.get_event_sources()
|
||||
self.clock = hs.get_clock()
|
||||
self.response_cache = ResponseCache(hs)
|
||||
self.response_cache = ResponseCache(
|
||||
hs, "sync",
|
||||
timeout_ms=SYNC_RESPONSE_CACHE_MS,
|
||||
)
|
||||
self.state = hs.get_state_handler()
|
||||
|
||||
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
|
||||
@@ -599,7 +603,7 @@ class SyncHandler(object):
|
||||
since_token is None and
|
||||
sync_config.filter_collection.blocks_all_presence()
|
||||
)
|
||||
if not block_all_presence_data:
|
||||
if False and not block_all_presence_data:
|
||||
yield self._generate_sync_entry_for_presence(
|
||||
sync_result_builder, newly_joined_rooms, newly_joined_users
|
||||
)
|
||||
|
||||
@@ -12,8 +12,6 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import socket
|
||||
|
||||
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
|
||||
from twisted.internet import defer, reactor
|
||||
from twisted.internet.error import ConnectError
|
||||
@@ -33,7 +31,7 @@ SERVER_CACHE = {}
|
||||
|
||||
# our record of an individual server which can be tried to reach a destination.
|
||||
#
|
||||
# "host" is actually a dotted-quad or ipv6 address string. Except when there's
|
||||
# "host" is the hostname acquired from the SRV record. Except when there's
|
||||
# no SRV record, in which case it is the original hostname.
|
||||
_Server = collections.namedtuple(
|
||||
"_Server", "priority weight host port expires"
|
||||
@@ -297,20 +295,13 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
|
||||
|
||||
payload = answer.payload
|
||||
|
||||
hosts = yield _get_hosts_for_srv_record(
|
||||
dns_client, str(payload.target)
|
||||
)
|
||||
|
||||
for (ip, ttl) in hosts:
|
||||
host_ttl = min(answer.ttl, ttl)
|
||||
|
||||
servers.append(_Server(
|
||||
host=ip,
|
||||
port=int(payload.port),
|
||||
priority=int(payload.priority),
|
||||
weight=int(payload.weight),
|
||||
expires=int(clock.time()) + host_ttl,
|
||||
))
|
||||
servers.append(_Server(
|
||||
host=str(payload.target),
|
||||
port=int(payload.port),
|
||||
priority=int(payload.priority),
|
||||
weight=int(payload.weight),
|
||||
expires=int(clock.time()) + answer.ttl,
|
||||
))
|
||||
|
||||
servers.sort()
|
||||
cache[service_name] = list(servers)
|
||||
@@ -328,81 +319,3 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
|
||||
raise e
|
||||
|
||||
defer.returnValue(servers)
|
||||
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_hosts_for_srv_record(dns_client, host):
|
||||
"""Look up each of the hosts in a SRV record
|
||||
|
||||
Args:
|
||||
dns_client (twisted.names.dns.IResolver):
|
||||
host (basestring): host to look up
|
||||
|
||||
Returns:
|
||||
Deferred[list[(str, int)]]: a list of (host, ttl) pairs
|
||||
|
||||
"""
|
||||
ip4_servers = []
|
||||
ip6_servers = []
|
||||
|
||||
def cb(res):
|
||||
# lookupAddress and lookupIP6Address return a three-tuple
|
||||
# giving the answer, authority, and additional sections of the
|
||||
# response.
|
||||
#
|
||||
# we only care about the answers.
|
||||
|
||||
return res[0]
|
||||
|
||||
def eb(res, record_type):
|
||||
if res.check(DNSNameError):
|
||||
return []
|
||||
logger.warn("Error looking up %s for %s: %s", record_type, host, res)
|
||||
return res
|
||||
|
||||
# no logcontexts here, so we can safely fire these off and gatherResults
|
||||
d1 = dns_client.lookupAddress(host).addCallbacks(
|
||||
cb, eb, errbackArgs=("A", ))
|
||||
d2 = dns_client.lookupIPV6Address(host).addCallbacks(
|
||||
cb, eb, errbackArgs=("AAAA", ))
|
||||
results = yield defer.DeferredList(
|
||||
[d1, d2], consumeErrors=True)
|
||||
|
||||
# if all of the lookups failed, raise an exception rather than blowing out
|
||||
# the cache with an empty result.
|
||||
if results and all(s == defer.FAILURE for (s, _) in results):
|
||||
defer.returnValue(results[0][1])
|
||||
|
||||
for (success, result) in results:
|
||||
if success == defer.FAILURE:
|
||||
continue
|
||||
|
||||
for answer in result:
|
||||
if not answer.payload:
|
||||
continue
|
||||
|
||||
try:
|
||||
if answer.type == dns.A:
|
||||
ip = answer.payload.dottedQuad()
|
||||
ip4_servers.append((ip, answer.ttl))
|
||||
elif answer.type == dns.AAAA:
|
||||
ip = socket.inet_ntop(
|
||||
socket.AF_INET6, answer.payload.address,
|
||||
)
|
||||
ip6_servers.append((ip, answer.ttl))
|
||||
else:
|
||||
# the most likely candidate here is a CNAME record.
|
||||
# rfc2782 says srvs may not point to aliases.
|
||||
logger.warn(
|
||||
"Ignoring unexpected DNS record type %s for %s",
|
||||
answer.type, host,
|
||||
)
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.warn("Ignoring invalid DNS response for %s: %s",
|
||||
host, e)
|
||||
continue
|
||||
|
||||
# keep the ipv4 results before the ipv6 results, mostly to match historical
|
||||
# behaviour.
|
||||
defer.returnValue(ip4_servers + ip6_servers)
|
||||
|
||||
@@ -329,7 +329,7 @@ class JsonResource(HttpServer, resource.Resource):
|
||||
register_paths, so will return (possibly via Deferred) either
|
||||
None, or a tuple of (http code, response body).
|
||||
"""
|
||||
if request.method == "OPTIONS":
|
||||
if request.method == b"OPTIONS":
|
||||
return _options_handler, {}
|
||||
|
||||
# Loop through all the registered callbacks to check if the method
|
||||
@@ -543,7 +543,7 @@ def finish_request(request):
|
||||
|
||||
def _request_user_agent_is_curl(request):
|
||||
user_agents = request.requestHeaders.getRawHeaders(
|
||||
"User-Agent", default=[]
|
||||
b"User-Agent", default=[]
|
||||
)
|
||||
for user_agent in user_agents:
|
||||
if "curl" in user_agent:
|
||||
|
||||
@@ -20,7 +20,7 @@ import logging
|
||||
import re
|
||||
import time
|
||||
|
||||
ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
|
||||
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
|
||||
|
||||
|
||||
class SynapseRequest(Request):
|
||||
@@ -43,12 +43,12 @@ class SynapseRequest(Request):
|
||||
|
||||
def get_redacted_uri(self):
|
||||
return ACCESS_TOKEN_RE.sub(
|
||||
r'\1<redacted>\3',
|
||||
br'\1<redacted>\3',
|
||||
self.uri
|
||||
)
|
||||
|
||||
def get_user_agent(self):
|
||||
return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1]
|
||||
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
|
||||
|
||||
def started_processing(self):
|
||||
self.site.access_logger.info(
|
||||
|
||||
@@ -17,12 +17,13 @@ import logging
|
||||
import functools
|
||||
import time
|
||||
import gc
|
||||
import platform
|
||||
|
||||
from twisted.internet import reactor
|
||||
|
||||
from .metric import (
|
||||
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
|
||||
MemoryUsageMetric,
|
||||
MemoryUsageMetric, GaugeMetric,
|
||||
)
|
||||
from .process_collector import register_process_collector
|
||||
|
||||
@@ -30,6 +31,7 @@ from .process_collector import register_process_collector
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
running_on_pypy = platform.python_implementation() == 'PyPy'
|
||||
all_metrics = []
|
||||
all_collectors = []
|
||||
|
||||
@@ -63,6 +65,13 @@ class Metrics(object):
|
||||
"""
|
||||
return self._register(CounterMetric, *args, **kwargs)
|
||||
|
||||
def register_gauge(self, *args, **kwargs):
|
||||
"""
|
||||
Returns:
|
||||
GaugeMetric
|
||||
"""
|
||||
return self._register(GaugeMetric, *args, **kwargs)
|
||||
|
||||
def register_callback(self, *args, **kwargs):
|
||||
"""
|
||||
Returns:
|
||||
@@ -142,6 +151,32 @@ reactor_metrics = get_metrics_for("python.twisted.reactor")
|
||||
tick_time = reactor_metrics.register_distribution("tick_time")
|
||||
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
|
||||
|
||||
synapse_metrics = get_metrics_for("synapse")
|
||||
|
||||
# Used to track where various components have processed in the event stream,
|
||||
# e.g. federation sending, appservice sending, etc.
|
||||
event_processing_positions = synapse_metrics.register_gauge(
|
||||
"event_processing_positions", labels=["name"],
|
||||
)
|
||||
|
||||
# Used to track the current max events stream position
|
||||
event_persisted_position = synapse_metrics.register_gauge(
|
||||
"event_persisted_position",
|
||||
)
|
||||
|
||||
# Used to track the received_ts of the last event processed by various
|
||||
# components
|
||||
event_processing_last_ts = synapse_metrics.register_gauge(
|
||||
"event_processing_last_ts", labels=["name"],
|
||||
)
|
||||
|
||||
# Used to track the lag processing events. This is the time difference
|
||||
# between the last processed event's received_ts and the time it was
|
||||
# finished being processed.
|
||||
event_processing_lag = synapse_metrics.register_gauge(
|
||||
"event_processing_lag", labels=["name"],
|
||||
)
|
||||
|
||||
|
||||
def runUntilCurrentTimer(func):
|
||||
|
||||
@@ -174,6 +209,9 @@ def runUntilCurrentTimer(func):
|
||||
tick_time.inc_by(end - start)
|
||||
pending_calls_metric.inc_by(num_pending)
|
||||
|
||||
if running_on_pypy:
|
||||
return ret
|
||||
|
||||
# Check if we need to do a manual GC (since its been disabled), and do
|
||||
# one if necessary.
|
||||
threshold = gc.get_threshold()
|
||||
@@ -206,6 +244,7 @@ try:
|
||||
|
||||
# We manually run the GC each reactor tick so that we can get some metrics
|
||||
# about time spent doing GC,
|
||||
gc.disable()
|
||||
if not running_on_pypy:
|
||||
gc.disable()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
@@ -115,7 +115,7 @@ class CounterMetric(BaseMetric):
|
||||
# dict[list[str]]: value for each set of label values. the keys are the
|
||||
# label values, in the same order as the labels in self.labels.
|
||||
#
|
||||
# (if the metric is a scalar, the (single) key is the empty list).
|
||||
# (if the metric is a scalar, the (single) key is the empty tuple).
|
||||
self.counts = {}
|
||||
|
||||
# Scalar metrics are never empty
|
||||
@@ -145,6 +145,36 @@ class CounterMetric(BaseMetric):
|
||||
)
|
||||
|
||||
|
||||
class GaugeMetric(BaseMetric):
|
||||
"""A metric that can go up or down
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(GaugeMetric, self).__init__(*args, **kwargs)
|
||||
|
||||
# dict[list[str]]: value for each set of label values. the keys are the
|
||||
# label values, in the same order as the labels in self.labels.
|
||||
#
|
||||
# (if the metric is a scalar, the (single) key is the empty tuple).
|
||||
self.guages = {}
|
||||
|
||||
def set(self, v, *values):
|
||||
if len(values) != self.dimension():
|
||||
raise ValueError(
|
||||
"Expected as many values to inc() as labels (%d)" % (self.dimension())
|
||||
)
|
||||
|
||||
# TODO: should assert that the tag values are all strings
|
||||
|
||||
self.guages[values] = v
|
||||
|
||||
def render(self):
|
||||
return flatten(
|
||||
self._render_for_labels(k, self.guages[k])
|
||||
for k in sorted(self.guages.keys())
|
||||
)
|
||||
|
||||
|
||||
class CallbackMetric(BaseMetric):
|
||||
"""A metric that returns the numeric value returned by a callback whenever
|
||||
it is rendered. Typically this is used to implement gauges that yield the
|
||||
|
||||
@@ -115,7 +115,7 @@ class ReplicationSendEventRestServlet(RestServlet):
|
||||
self.clock = hs.get_clock()
|
||||
|
||||
# The responses are tiny, so we may as well cache them for a while
|
||||
self.response_cache = ResponseCache(hs, timeout_ms=30 * 60 * 1000)
|
||||
self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
|
||||
|
||||
def on_PUT(self, request, event_id):
|
||||
result = self.response_cache.get(event_id)
|
||||
|
||||
@@ -42,6 +42,8 @@ class SlavedClientIpStore(BaseSlavedStore):
|
||||
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
|
||||
return
|
||||
|
||||
self.client_ip_last_seen.prefill(key, now)
|
||||
|
||||
self.hs.get_tcp_replication().send_user_ip(
|
||||
user_id, access_token, ip, user_agent, device_id, now
|
||||
)
|
||||
|
||||
@@ -33,7 +33,7 @@ import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
MAX_EVENTS_BEHIND = 10000
|
||||
MAX_EVENTS_BEHIND = 500000
|
||||
|
||||
|
||||
EventStreamRow = namedtuple("EventStreamRow", (
|
||||
|
||||
@@ -296,17 +296,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
|
||||
)
|
||||
new_room_id = info["room_id"]
|
||||
|
||||
yield self.event_creation_handler.create_and_send_nonmember_event(
|
||||
room_creator_requester,
|
||||
{
|
||||
"type": "m.room.message",
|
||||
"content": {"body": message, "msgtype": "m.text"},
|
||||
"room_id": new_room_id,
|
||||
"sender": new_room_user_id,
|
||||
},
|
||||
ratelimit=False,
|
||||
)
|
||||
|
||||
requester_user_id = requester.user.to_string()
|
||||
|
||||
logger.info("Shutting down room %r", room_id)
|
||||
@@ -344,6 +333,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
|
||||
|
||||
kicked_users.append(user_id)
|
||||
|
||||
yield self.event_creation_handler.create_and_send_nonmember_event(
|
||||
room_creator_requester,
|
||||
{
|
||||
"type": "m.room.message",
|
||||
"content": {"body": message, "msgtype": "m.text"},
|
||||
"room_id": new_room_id,
|
||||
"sender": new_room_user_id,
|
||||
},
|
||||
ratelimit=False,
|
||||
)
|
||||
|
||||
aliases_for_room = yield self.store.get_aliases_for_room(room_id)
|
||||
|
||||
yield self.store.update_aliases_for_room(
|
||||
|
||||
@@ -44,7 +44,10 @@ class LogoutRestServlet(ClientV1RestServlet):
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
except AuthError:
|
||||
# this implies the access token has already been deleted.
|
||||
pass
|
||||
defer.returnValue((401, {
|
||||
"errcode": "M_UNKNOWN_TOKEN",
|
||||
"error": "Access Token unknown or expired"
|
||||
}))
|
||||
else:
|
||||
if requester.device_id is None:
|
||||
# the acccess token wasn't associated with a device.
|
||||
|
||||
@@ -81,7 +81,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
|
||||
except Exception:
|
||||
raise SynapseError(400, "Unable to parse state")
|
||||
|
||||
yield self.presence_handler.set_state(user, state)
|
||||
# yield self.presence_handler.set_state(user, state)
|
||||
|
||||
defer.returnValue((200, {}))
|
||||
|
||||
|
||||
@@ -348,9 +348,9 @@ class RegisterRestServlet(ClientV1RestServlet):
|
||||
admin = register_json.get("admin", None)
|
||||
|
||||
# Its important to check as we use null bytes as HMAC field separators
|
||||
if "\x00" in user:
|
||||
if b"\x00" in user:
|
||||
raise SynapseError(400, "Invalid user")
|
||||
if "\x00" in password:
|
||||
if b"\x00" in password:
|
||||
raise SynapseError(400, "Invalid password")
|
||||
|
||||
# str() because otherwise hmac complains that 'unicode' does not
|
||||
|
||||
@@ -165,17 +165,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
|
||||
content=content,
|
||||
)
|
||||
else:
|
||||
event, context = yield self.event_creation_hander.create_event(
|
||||
event = yield self.event_creation_hander.create_and_send_nonmember_event(
|
||||
requester,
|
||||
event_dict,
|
||||
token_id=requester.access_token_id,
|
||||
txn_id=txn_id,
|
||||
)
|
||||
|
||||
yield self.event_creation_hander.send_nonmember_event(
|
||||
requester, event, context,
|
||||
)
|
||||
|
||||
ret = {}
|
||||
if event:
|
||||
ret = {"event_id": event.event_id}
|
||||
|
||||
@@ -20,7 +20,6 @@ import synapse
|
||||
import synapse.types
|
||||
from synapse.api.auth import get_access_token_from_request, has_access_token
|
||||
from synapse.api.constants import LoginType
|
||||
from synapse.types import RoomID, RoomAlias
|
||||
from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError
|
||||
from synapse.http.servlet import (
|
||||
RestServlet, parse_json_object_from_request, assert_params_in_request, parse_string
|
||||
@@ -405,14 +404,6 @@ class RegisterRestServlet(RestServlet):
|
||||
generate_token=False,
|
||||
)
|
||||
|
||||
# auto-join the user to any rooms we're supposed to dump them into
|
||||
fake_requester = synapse.types.create_requester(registered_user_id)
|
||||
for r in self.hs.config.auto_join_rooms:
|
||||
try:
|
||||
yield self._join_user_to_room(fake_requester, r)
|
||||
except Exception as e:
|
||||
logger.error("Failed to join new user to %r: %r", r, e)
|
||||
|
||||
# remember that we've now registered that user account, and with
|
||||
# what user ID (since the user may not have specified)
|
||||
self.auth_handler.set_session_data(
|
||||
@@ -445,29 +436,6 @@ class RegisterRestServlet(RestServlet):
|
||||
def on_OPTIONS(self, _):
|
||||
return 200, {}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _join_user_to_room(self, requester, room_identifier):
|
||||
room_id = None
|
||||
if RoomID.is_valid(room_identifier):
|
||||
room_id = room_identifier
|
||||
elif RoomAlias.is_valid(room_identifier):
|
||||
room_alias = RoomAlias.from_string(room_identifier)
|
||||
room_id, remote_room_hosts = (
|
||||
yield self.room_member_handler.lookup_room_alias(room_alias)
|
||||
)
|
||||
room_id = room_id.to_string()
|
||||
else:
|
||||
raise SynapseError(400, "%s was not legal room ID or room alias" % (
|
||||
room_identifier,
|
||||
))
|
||||
|
||||
yield self.room_member_handler.update_membership(
|
||||
requester=requester,
|
||||
target=requester.user,
|
||||
room_id=room_id,
|
||||
action="join",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_appservice_registration(self, username, as_token, body):
|
||||
user_id = yield self.registration_handler.appservice_register(
|
||||
|
||||
@@ -16,6 +16,8 @@
|
||||
from twisted.internet import defer, threads
|
||||
from twisted.protocols.basic import FileSender
|
||||
|
||||
import six
|
||||
|
||||
from ._base import Responder
|
||||
|
||||
from synapse.util.file_consumer import BackgroundFileConsumer
|
||||
@@ -119,7 +121,7 @@ class MediaStorage(object):
|
||||
os.remove(fname)
|
||||
except Exception:
|
||||
pass
|
||||
raise t, v, tb
|
||||
six.reraise(t, v, tb)
|
||||
|
||||
if not finished_called:
|
||||
raise Exception("Finished callback not called")
|
||||
|
||||
@@ -266,16 +266,16 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||
def count_r30_users(self):
|
||||
"""
|
||||
Counts the number of 30 day retained users, defined as:-
|
||||
* Users who have created their accounts more than 30 days
|
||||
* Users who have created their accounts more than 30 days ago
|
||||
* Where last seen at most 30 days ago
|
||||
* Where account creation and last_seen are > 30 days
|
||||
* Where account creation and last_seen are > 30 days apart
|
||||
|
||||
Returns counts globaly for a given user as well as breaking
|
||||
by platform
|
||||
"""
|
||||
def _count_r30_users(txn):
|
||||
thirty_days_in_secs = 86400 * 30
|
||||
now = int(self._clock.time_msec())
|
||||
now = int(self._clock.time())
|
||||
thirty_days_ago_in_secs = now - thirty_days_in_secs
|
||||
|
||||
sql = """
|
||||
@@ -289,11 +289,11 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||
user_id,
|
||||
last_seen,
|
||||
CASE
|
||||
WHEN user_agent LIKE '%Android%' THEN 'android'
|
||||
WHEN user_agent LIKE '%iOS%' THEN 'ios'
|
||||
WHEN user_agent LIKE '%Electron%' THEN 'electron'
|
||||
WHEN user_agent LIKE '%Mozilla%' THEN 'web'
|
||||
WHEN user_agent LIKE '%Gecko%' THEN 'web'
|
||||
WHEN user_agent LIKE '%%Android%%' THEN 'android'
|
||||
WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
|
||||
WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
|
||||
WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
|
||||
WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
|
||||
ELSE 'unknown'
|
||||
END
|
||||
AS platform
|
||||
|
||||
@@ -376,7 +376,7 @@ class SQLBaseStore(object):
|
||||
Returns:
|
||||
A list of dicts where the key is the column header.
|
||||
"""
|
||||
col_headers = list(intern(column[0]) for column in cursor.description)
|
||||
col_headers = list(intern(str(column[0])) for column in cursor.description)
|
||||
results = list(
|
||||
dict(zip(col_headers, row)) for row in cursor
|
||||
)
|
||||
|
||||
@@ -28,7 +28,7 @@ logger = logging.getLogger(__name__)
|
||||
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
|
||||
# times give more inserts into the database even for readonly API hits
|
||||
# 120 seconds == 2 minutes
|
||||
LAST_SEEN_GRANULARITY = 120 * 1000
|
||||
LAST_SEEN_GRANULARITY = 10 * 60 * 1000
|
||||
|
||||
|
||||
class ClientIpStore(background_updates.BackgroundUpdateStore):
|
||||
|
||||
@@ -18,6 +18,7 @@ from .postgres import PostgresEngine
|
||||
from .sqlite3 import Sqlite3Engine
|
||||
|
||||
import importlib
|
||||
import platform
|
||||
|
||||
|
||||
SUPPORTED_MODULE = {
|
||||
@@ -31,6 +32,10 @@ def create_engine(database_config):
|
||||
engine_class = SUPPORTED_MODULE.get(name, None)
|
||||
|
||||
if engine_class:
|
||||
# pypy requires psycopg2cffi rather than psycopg2
|
||||
if (name == "psycopg2" and
|
||||
platform.python_implementation() == "PyPy"):
|
||||
name = "psycopg2cffi"
|
||||
module = importlib.import_module(name)
|
||||
return engine_class(module, database_config)
|
||||
|
||||
|
||||
@@ -613,6 +613,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
|
||||
self._rotate_notifs, 30 * 60 * 1000
|
||||
)
|
||||
|
||||
self._rotate_delay = 3
|
||||
self._rotate_count = 10000
|
||||
|
||||
def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
|
||||
all_events_and_contexts):
|
||||
"""Handles moving push actions from staging table to main
|
||||
@@ -809,7 +812,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
|
||||
)
|
||||
if caught_up:
|
||||
break
|
||||
yield sleep(5)
|
||||
yield sleep(self._rotate_delay)
|
||||
finally:
|
||||
self._doing_notif_rotation = False
|
||||
|
||||
@@ -830,8 +833,8 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
|
||||
txn.execute("""
|
||||
SELECT stream_ordering FROM event_push_actions
|
||||
WHERE stream_ordering > ?
|
||||
ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
|
||||
""", (old_rotate_stream_ordering,))
|
||||
ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
|
||||
""", (old_rotate_stream_ordering, self._rotate_count))
|
||||
stream_row = txn.fetchone()
|
||||
if stream_row:
|
||||
offset_stream_ordering, = stream_row
|
||||
|
||||
@@ -444,6 +444,9 @@ class EventsStore(EventsWorkerStore):
|
||||
new_forward_extremeties=new_forward_extremeties,
|
||||
)
|
||||
persist_event_counter.inc_by(len(chunk))
|
||||
synapse.metrics.event_persisted_position.set(
|
||||
chunk[-1][0].internal_metadata.stream_ordering,
|
||||
)
|
||||
for event, context in chunk:
|
||||
if context.app_service:
|
||||
origin_type = "local"
|
||||
@@ -1010,7 +1013,6 @@ class EventsStore(EventsWorkerStore):
|
||||
"event_edge_hashes",
|
||||
"event_edges",
|
||||
"event_forward_extremities",
|
||||
"event_push_actions",
|
||||
"event_reference_hashes",
|
||||
"event_search",
|
||||
"event_signatures",
|
||||
@@ -1030,6 +1032,14 @@ class EventsStore(EventsWorkerStore):
|
||||
[(ev.event_id,) for ev, _ in events_and_contexts]
|
||||
)
|
||||
|
||||
for table in (
|
||||
"event_push_actions",
|
||||
):
|
||||
txn.executemany(
|
||||
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
|
||||
[(ev.event_id,) for ev, _ in events_and_contexts]
|
||||
)
|
||||
|
||||
def _store_event_txn(self, txn, events_and_contexts):
|
||||
"""Insert new events into the event and event_json tables
|
||||
|
||||
|
||||
@@ -51,6 +51,26 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
|
||||
|
||||
|
||||
class EventsWorkerStore(SQLBaseStore):
|
||||
def get_received_ts(self, event_id):
|
||||
"""Get received_ts (when it was persisted) for the event.
|
||||
|
||||
Raises an exception for unknown events.
|
||||
|
||||
Args:
|
||||
event_id (str)
|
||||
|
||||
Returns:
|
||||
Deferred[int|None]: Timestamp in milliseconds, or None for events
|
||||
that were persisted before received_ts was implemented.
|
||||
"""
|
||||
return self._simple_select_one_onecol(
|
||||
table="events",
|
||||
keyvalues={
|
||||
"event_id": event_id,
|
||||
},
|
||||
retcol="received_ts",
|
||||
desc="get_received_ts",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_event(self, event_id, check_redacted=True,
|
||||
|
||||
@@ -65,7 +65,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
|
||||
defer.returnValue(hosts)
|
||||
|
||||
@cached(max_entries=100000, iterable=True)
|
||||
@cachedInlineCallbacks(max_entries=100000, iterable=True)
|
||||
def get_users_in_room(self, room_id):
|
||||
def f(txn):
|
||||
sql = (
|
||||
@@ -79,7 +79,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
|
||||
txn.execute(sql, (room_id, Membership.JOIN,))
|
||||
return [to_ascii(r[0]) for r in txn]
|
||||
return self.runInteraction("get_users_in_room", f)
|
||||
start_time = self._clock.time_msec()
|
||||
result = yield self.runInteraction("get_users_in_room", f)
|
||||
end_time = self._clock.time_msec()
|
||||
logger.info(
|
||||
"Fetched room membership for %s (%i users) in %i ms",
|
||||
room_id, len(result), end_time - start_time,
|
||||
)
|
||||
defer.returnValue(result)
|
||||
|
||||
@cached()
|
||||
def get_invited_rooms_for_user(self, user_id):
|
||||
@@ -453,7 +460,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
||||
|
||||
defer.returnValue(joined_hosts)
|
||||
|
||||
@cached(max_entries=10000, iterable=True)
|
||||
@cached(max_entries=10000)
|
||||
def _get_joined_hosts_cache(self, room_id):
|
||||
return _JoinedHostsCache(self, room_id)
|
||||
|
||||
|
||||
@@ -721,7 +721,7 @@ def _parse_query(database_engine, search_term):
|
||||
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
|
||||
|
||||
if isinstance(database_engine, PostgresEngine):
|
||||
return " & ".join(result + ":*" for result in results)
|
||||
return " & ".join(result for result in results)
|
||||
elif isinstance(database_engine, Sqlite3Engine):
|
||||
return " & ".join(result + "*" for result in results)
|
||||
else:
|
||||
|
||||
@@ -20,7 +20,7 @@ from twisted.internet import defer
|
||||
|
||||
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR
|
||||
from synapse.util.caches import intern_string, get_cache_factor_for
|
||||
from synapse.util.caches.descriptors import cached, cachedList
|
||||
from synapse.util.caches.dictionary_cache import DictionaryCache
|
||||
from synapse.util.stringutils import to_ascii
|
||||
@@ -54,7 +54,7 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||
super(StateGroupWorkerStore, self).__init__(db_conn, hs)
|
||||
|
||||
self._state_group_cache = DictionaryCache(
|
||||
"*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
|
||||
"*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache")
|
||||
)
|
||||
|
||||
@cached(max_entries=100000, iterable=True)
|
||||
@@ -566,8 +566,7 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||
state_dict = results[group]
|
||||
|
||||
state_dict.update(
|
||||
((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
|
||||
for k, v in group_state_dict.iteritems()
|
||||
group_state_dict.iteritems()
|
||||
)
|
||||
|
||||
self._state_group_cache.update(
|
||||
|
||||
@@ -169,7 +169,7 @@ class DomainSpecificString(
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
__str__ = to_string
|
||||
__repr__ = to_string
|
||||
|
||||
|
||||
class UserID(DomainSpecificString):
|
||||
|
||||
@@ -18,6 +18,16 @@ import os
|
||||
|
||||
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
|
||||
|
||||
|
||||
def get_cache_factor_for(cache_name):
|
||||
env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper()
|
||||
factor = os.environ.get(env_var)
|
||||
if factor:
|
||||
return float(factor)
|
||||
|
||||
return CACHE_SIZE_FACTOR
|
||||
|
||||
|
||||
metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
|
||||
|
||||
caches_by_name = {}
|
||||
|
||||
@@ -17,7 +17,7 @@ import logging
|
||||
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util import unwrapFirstError, logcontext
|
||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||
from synapse.util.caches import get_cache_factor_for
|
||||
from synapse.util.caches.lrucache import LruCache
|
||||
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
|
||||
from synapse.util.stringutils import to_ascii
|
||||
@@ -310,7 +310,7 @@ class CacheDescriptor(_CacheDescriptorBase):
|
||||
orig, num_args=num_args, inlineCallbacks=inlineCallbacks,
|
||||
cache_context=cache_context)
|
||||
|
||||
max_entries = int(max_entries * CACHE_SIZE_FACTOR)
|
||||
max_entries = int(max_entries * get_cache_factor_for(orig.__name__))
|
||||
|
||||
self.max_entries = max_entries
|
||||
self.tree = tree
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
from synapse.util.async import ObservableDeferred
|
||||
from synapse.util.caches import metrics as cache_metrics
|
||||
|
||||
|
||||
class ResponseCache(object):
|
||||
@@ -24,20 +25,63 @@ class ResponseCache(object):
|
||||
used rather than trying to compute a new response.
|
||||
"""
|
||||
|
||||
def __init__(self, hs, timeout_ms=0):
|
||||
def __init__(self, hs, name, timeout_ms=0):
|
||||
self.pending_result_cache = {} # Requests that haven't finished yet.
|
||||
|
||||
self.clock = hs.get_clock()
|
||||
self.timeout_sec = timeout_ms / 1000.
|
||||
|
||||
self._metrics = cache_metrics.register_cache(
|
||||
"response_cache",
|
||||
size_callback=lambda: self.size(),
|
||||
cache_name=name,
|
||||
)
|
||||
|
||||
def size(self):
|
||||
return len(self.pending_result_cache)
|
||||
|
||||
def get(self, key):
|
||||
"""Look up the given key.
|
||||
|
||||
Returns a deferred which doesn't follow the synapse logcontext rules,
|
||||
so you'll probably want to make_deferred_yieldable it.
|
||||
|
||||
Args:
|
||||
key (str):
|
||||
|
||||
Returns:
|
||||
twisted.internet.defer.Deferred|None: None if there is no entry
|
||||
for this key; otherwise a deferred result.
|
||||
"""
|
||||
result = self.pending_result_cache.get(key)
|
||||
if result is not None:
|
||||
self._metrics.inc_hits()
|
||||
return result.observe()
|
||||
else:
|
||||
self._metrics.inc_misses()
|
||||
return None
|
||||
|
||||
def set(self, key, deferred):
|
||||
"""Set the entry for the given key to the given deferred.
|
||||
|
||||
*deferred* should run its callbacks in the sentinel logcontext (ie,
|
||||
you should wrap normal synapse deferreds with
|
||||
logcontext.run_in_background).
|
||||
|
||||
Returns a new Deferred which also doesn't follow the synapse logcontext
|
||||
rules, so you will want to make_deferred_yieldable it
|
||||
|
||||
(TODO: before using this more widely, it might make sense to refactor
|
||||
it and get() so that they do the necessary wrapping rather than having
|
||||
to do it everywhere ResponseCache is used.)
|
||||
|
||||
Args:
|
||||
key (str):
|
||||
deferred (twisted.internet.defer.Deferred):
|
||||
|
||||
Returns:
|
||||
twisted.internet.defer.Deferred
|
||||
"""
|
||||
result = ObservableDeferred(deferred, consumeErrors=True)
|
||||
self.pending_result_cache[key] = result
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ class AppServiceHandlerTestCase(unittest.TestCase):
|
||||
self.mock_scheduler = Mock()
|
||||
hs = Mock()
|
||||
hs.get_datastore = Mock(return_value=self.mock_store)
|
||||
self.mock_store.get_received_ts.return_value = 0
|
||||
hs.get_application_service_api = Mock(return_value=self.mock_as_api)
|
||||
hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler)
|
||||
hs.get_clock.return_value = MockClock()
|
||||
|
||||
@@ -123,6 +123,7 @@ class EventStreamPermissionsTestCase(RestTestCase):
|
||||
self.ratelimiter.send_message.return_value = (True, 0)
|
||||
hs.config.enable_registration_captcha = False
|
||||
hs.config.enable_registration = True
|
||||
hs.config.auto_join_rooms = []
|
||||
|
||||
hs.get_handlers().federation_handler = Mock()
|
||||
|
||||
|
||||
@@ -984,11 +984,13 @@ class RoomInitialSyncTestCase(RestTestCase):
|
||||
|
||||
self.assertTrue("presence" in response)
|
||||
|
||||
presence_by_user = {
|
||||
e["content"]["user_id"]: e for e in response["presence"]
|
||||
}
|
||||
self.assertTrue(self.user_id in presence_by_user)
|
||||
self.assertEquals("m.presence", presence_by_user[self.user_id]["type"])
|
||||
# presence is turned off on hotfixes
|
||||
|
||||
# presence_by_user = {
|
||||
# e["content"]["user_id"]: e for e in response["presence"]
|
||||
# }
|
||||
# self.assertTrue(self.user_id in presence_by_user)
|
||||
# self.assertEquals("m.presence", presence_by_user[self.user_id]["type"])
|
||||
|
||||
|
||||
class RoomMessageListTestCase(RestTestCase):
|
||||
|
||||
@@ -33,8 +33,6 @@ class DnsTestCase(unittest.TestCase):
|
||||
|
||||
service_name = "test_service.example.com"
|
||||
host_name = "example.com"
|
||||
ip_address = "127.0.0.1"
|
||||
ip6_address = "::1"
|
||||
|
||||
answer_srv = dns.RRHeader(
|
||||
type=dns.SRV,
|
||||
@@ -43,29 +41,9 @@ class DnsTestCase(unittest.TestCase):
|
||||
)
|
||||
)
|
||||
|
||||
answer_a = dns.RRHeader(
|
||||
type=dns.A,
|
||||
payload=dns.Record_A(
|
||||
address=ip_address,
|
||||
)
|
||||
)
|
||||
|
||||
answer_aaaa = dns.RRHeader(
|
||||
type=dns.AAAA,
|
||||
payload=dns.Record_AAAA(
|
||||
address=ip6_address,
|
||||
)
|
||||
)
|
||||
|
||||
dns_client_mock.lookupService.return_value = defer.succeed(
|
||||
([answer_srv], None, None),
|
||||
)
|
||||
dns_client_mock.lookupAddress.return_value = defer.succeed(
|
||||
([answer_a], None, None),
|
||||
)
|
||||
dns_client_mock.lookupIPV6Address.return_value = defer.succeed(
|
||||
([answer_aaaa], None, None),
|
||||
)
|
||||
|
||||
cache = {}
|
||||
|
||||
@@ -74,13 +52,10 @@ class DnsTestCase(unittest.TestCase):
|
||||
)
|
||||
|
||||
dns_client_mock.lookupService.assert_called_once_with(service_name)
|
||||
dns_client_mock.lookupAddress.assert_called_once_with(host_name)
|
||||
dns_client_mock.lookupIPV6Address.assert_called_once_with(host_name)
|
||||
|
||||
self.assertEquals(len(servers), 2)
|
||||
self.assertEquals(len(servers), 1)
|
||||
self.assertEquals(servers, cache[service_name])
|
||||
self.assertEquals(servers[0].host, ip_address)
|
||||
self.assertEquals(servers[1].host, ip6_address)
|
||||
self.assertEquals(servers[0].host, host_name)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_from_cache_expired_and_dns_fail(self):
|
||||
|
||||
@@ -212,7 +212,7 @@ class MockHttpResource(HttpServer):
|
||||
|
||||
headers = {}
|
||||
if federation_auth:
|
||||
headers["Authorization"] = ["X-Matrix origin=test,key=,sig="]
|
||||
headers[b"Authorization"] = ["X-Matrix origin=test,key=,sig="]
|
||||
mock_request.requestHeaders.getRawHeaders = mock_getRawHeaders(headers)
|
||||
|
||||
# return the right path if the event requires it
|
||||
|
||||
Reference in New Issue
Block a user