From 44ad6dd4bf7dad7ebfe554eabf78e56ba7a964c7 Mon Sep 17 00:00:00 2001 From: Krombel Date: Tue, 7 Nov 2017 13:35:35 +0100 Subject: [PATCH 01/65] update prometheus-config to new format --- contrib/prometheus/README | 19 +++++---- contrib/prometheus/synapse.rules | 71 +++++++++++++++++++++++++------- 2 files changed, 67 insertions(+), 23 deletions(-) diff --git a/contrib/prometheus/README b/contrib/prometheus/README index eb91db2de2..d2ddb3f6e4 100644 --- a/contrib/prometheus/README +++ b/contrib/prometheus/README @@ -5,15 +5,20 @@ To use it, first install prometheus by following the instructions at http://prometheus.io/ -Then add a new job to the main prometheus.conf file: +Then add a new job to the main prometheus.yml file: - job: { - name: "synapse" + - job_name: "synapse" + metrics_path: "/_synapse/metrics" + # when endpoint uses https: + scheme: "https" - target_group: { - target: "http://SERVER.LOCATION.HERE:PORT/_synapse/metrics" - } - } + static_configs: + - targets: ['SERVER.LOCATION:PORT'] + +To use `synapse.rules` add + + rule_files: + - "/PATH/TO/synapse.rules" Metrics are disabled by default when running synapse; they must be enabled with the 'enable-metrics' option, either in the synapse config file or as a diff --git a/contrib/prometheus/synapse.rules b/contrib/prometheus/synapse.rules index b6f84174b0..07e37a885e 100644 --- a/contrib/prometheus/synapse.rules +++ b/contrib/prometheus/synapse.rules @@ -1,21 +1,60 @@ -synapse_federation_transaction_queue_pendingEdus:total = sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0) -synapse_federation_transaction_queue_pendingPdus:total = sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0) +groups: +- name: synapse + rules: + - record: "synapse_federation_transaction_queue_pendingEdus:total" + expr: "sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)" + - record: "synapse_federation_transaction_queue_pendingPdus:total" + expr: "sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)" + - record: 'synapse_http_server_requests:method' + labels: + servlet: "" + expr: "sum(synapse_http_server_requests) by (method)" + - record: 'synapse_http_server_requests:servlet' + labels: + method: "" + expr: 'sum(synapse_http_server_requests) by (servlet)' -synapse_http_server_requests:method{servlet=""} = sum(synapse_http_server_requests) by (method) -synapse_http_server_requests:servlet{method=""} = sum(synapse_http_server_requests) by (servlet) + - record: 'synapse_http_server_requests:total' + labels: + servlet: "" + expr: 'sum(synapse_http_server_requests:by_method) by (servlet)' -synapse_http_server_requests:total{servlet=""} = sum(synapse_http_server_requests:by_method) by (servlet) + - record: 'synapse_cache:hit_ratio_5m' + expr: 'rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])' + - record: 'synapse_cache:hit_ratio_30s' + expr: 'rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s])' -synapse_cache:hit_ratio_5m = rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m]) -synapse_cache:hit_ratio_30s = rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s]) + - record: 'synapse_federation_client_sent' + labels: + type: "EDU" + expr: 'synapse_federation_client_sent_edus + 0' + - record: 'synapse_federation_client_sent' + labels: + type: "PDU" + expr: 'synapse_federation_client_sent_pdu_destinations:count + 0' + - record: 'synapse_federation_client_sent' + labels: + type: "Query" + expr: 'sum(synapse_federation_client_sent_queries) by (job)' -synapse_federation_client_sent{type="EDU"} = synapse_federation_client_sent_edus + 0 -synapse_federation_client_sent{type="PDU"} = synapse_federation_client_sent_pdu_destinations:count + 0 -synapse_federation_client_sent{type="Query"} = sum(synapse_federation_client_sent_queries) by (job) + - record: 'synapse_federation_server_received' + labels: + type: "EDU" + expr: 'synapse_federation_server_received_edus + 0' + - record: 'synapse_federation_server_received' + labels: + type: "PDU" + expr: 'synapse_federation_server_received_pdus + 0' + - record: 'synapse_federation_server_received' + labels: + type: "Query" + expr: 'sum(synapse_federation_server_received_queries) by (job)' -synapse_federation_server_received{type="EDU"} = synapse_federation_server_received_edus + 0 -synapse_federation_server_received{type="PDU"} = synapse_federation_server_received_pdus + 0 -synapse_federation_server_received{type="Query"} = sum(synapse_federation_server_received_queries) by (job) - -synapse_federation_transaction_queue_pending{type="EDU"} = synapse_federation_transaction_queue_pending_edus + 0 -synapse_federation_transaction_queue_pending{type="PDU"} = synapse_federation_transaction_queue_pending_pdus + 0 + - record: 'synapse_federation_transaction_queue_pending' + labels: + type: "EDU" + expr: 'synapse_federation_transaction_queue_pending_edus + 0' + - record: 'synapse_federation_transaction_queue_pending' + labels: + type: "PDU" + expr: 'synapse_federation_transaction_queue_pending_pdus + 0' From 2a98ba0ed31bdd51ea43c0867bee2a5256f2a289 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 8 Nov 2017 10:35:30 +0000 Subject: [PATCH 02/65] Rename redact_content option to include_content The redact_content option never worked because it read the wrong config section. The PR introducing it (https://github.com/matrix-org/synapse/pull/2301) had feedback suggesting the name be changed to not re-use the term 'redact' but this wasn't incorporated. This reanmes the option to give it a less confusing name, and also means that people who've set the redact_content option won't suddenly see a behaviour change when upgrading synapse, but instead can set include_content if they want to. This PR also updates the wording of the config comment to clarify that this has no effect on event_id_only push. Includes https://github.com/matrix-org/synapse/pull/2422 --- synapse/config/push.py | 28 +++++++++++++--------------- synapse/push/httppusher.py | 3 ++- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/synapse/config/push.py b/synapse/config/push.py index 9c68318b40..01d4a49784 100644 --- a/synapse/config/push.py +++ b/synapse/config/push.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,28 +19,25 @@ from ._base import Config class PushConfig(Config): def read_config(self, config): - self.push_redact_content = False + self.push_include_content = True - push_config = config.get("email", {}) - self.push_redact_content = push_config.get("redact_content", False) + push_config = config.get("push", {}) + self.push_include_content = push_config.get("include_content", True) def default_config(self, config_dir_path, server_name, **kwargs): return """ - # Control how push messages are sent to google/apple to notifications. - # Normally every message said in a room with one or more people using - # mobile devices will be posted to a push server hosted by matrix.org - # which is registered with google and apple in order to allow push - # notifications to be sent to these mobile devices. - # - # Setting redact_content to true will make the push messages contain no - # message content which will provide increased privacy. This is a - # temporary solution pending improvements to Android and iPhone apps - # to get content from the app rather than the notification. - # + # Clients requesting push notifications can either have the body of + # the message sent in the notification poke along with other details + # like the sender, or just the event ID and room ID (`event_id_only`). + # If clients choose the former, this option controls whether the + # notification request includes the content of the event (other details + # like the sender are still included). For `event_id_only` push, it + # has no effect. + # For modern android devices the notification content will still appear # because it is loaded by the app. iPhone, however will send a # notification saying only that a message arrived and who it came from. # #push: - # redact_content: false + # include_content: false """ diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 74c0bc462c..c16f61452c 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -295,7 +296,7 @@ class HttpPusher(object): if event.type == 'm.room.member': d['notification']['membership'] = event.content['membership'] d['notification']['user_is_target'] = event.state_key == self.user_id - if not self.hs.config.push_redact_content and 'content' in event: + if self.hs.config.push_include_content and 'content' in event: d['notification']['content'] = event.content # We no longer send aliases separately, instead, we send the human From 1b870937ae2de0ba510f0e1db40ae0e9a316d83f Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 8 Nov 2017 11:46:24 +0000 Subject: [PATCH 03/65] Log if any of the old config flags are set --- synapse/config/push.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/synapse/config/push.py b/synapse/config/push.py index 01d4a49784..861f5f31a7 100644 --- a/synapse/config/push.py +++ b/synapse/config/push.py @@ -16,14 +16,36 @@ from ._base import Config +import logging + +from twisted.internet import reactor + + +logger = logging.getLogger(__name__) + class PushConfig(Config): def read_config(self, config): - self.push_include_content = True - push_config = config.get("push", {}) self.push_include_content = push_config.get("include_content", True) + if push_config.get("redact_content") is not None: + reactor.callWhenRunning(lambda: logger.warn( + "The push.redact_content content option has never worked. " + "Please set push.include_content if you want this behaviour" + )) + + # There was a a 'redact_content' setting but mistakenly read from the + # 'email' section: check for it and honour it, with a warning. + push_config = config.get("email", {}) + redact_content = push_config.get("redact_content") + if redact_content is not None: + reactor.callWhenRunning(lambda: logger.warn( + "The 'email.redact_content' option is deprecated: " + "please set push.include_content instead" + )) + self.push_include_content = not redact_content + def default_config(self, config_dir_path, server_name, **kwargs): return """ # Clients requesting push notifications can either have the body of From ad408beb663052bc5700015db4716583f40a4536 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 8 Nov 2017 11:50:08 +0000 Subject: [PATCH 04/65] better comments --- synapse/config/push.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/config/push.py b/synapse/config/push.py index 861f5f31a7..bbfeb05d50 100644 --- a/synapse/config/push.py +++ b/synapse/config/push.py @@ -29,14 +29,17 @@ class PushConfig(Config): push_config = config.get("push", {}) self.push_include_content = push_config.get("include_content", True) + # There was a a 'redact_content' setting but mistakenly read from the + # 'email'section'. Check for the flag in the 'push' section, and log, + # but do not honour it to avoid nasty surprises when people upgrade. if push_config.get("redact_content") is not None: reactor.callWhenRunning(lambda: logger.warn( "The push.redact_content content option has never worked. " "Please set push.include_content if you want this behaviour" )) - # There was a a 'redact_content' setting but mistakenly read from the - # 'email' section: check for it and honour it, with a warning. + # Now check for the one in the 'email' section and honour it, + # with a warning. push_config = config.get("email", {}) redact_content = push_config.get("redact_content") if redact_content is not None: From b2a788e902c6fb6d3c516177fbb9f7e201e5cf0e Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 9 Nov 2017 10:11:42 +0000 Subject: [PATCH 05/65] Make the commented config have the default --- synapse/config/push.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/config/push.py b/synapse/config/push.py index bbfeb05d50..8fc1b98eba 100644 --- a/synapse/config/push.py +++ b/synapse/config/push.py @@ -64,5 +64,5 @@ class PushConfig(Config): # notification saying only that a message arrived and who it came from. # #push: - # include_content: false + # include_content: true """ From f90649eb2b0988c771fa329ba7a0a5ba81fe2396 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 10 Nov 2017 09:15:39 +0000 Subject: [PATCH 06/65] Fix 500 on invalid utf-8 in request If somebody sends us a request where the the body is invalid utf-8, we should return a 400 rather than a 500. (json.loads throws a UnicodeError in this situation) We might as well catch all Exceptions here: it seems very unlikely that we would get a request that *isn't caused by invalid json. --- synapse/http/servlet.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 8118ee7cc2..71420e54db 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -167,7 +167,8 @@ def parse_json_value_from_request(request): try: content = simplejson.loads(content_bytes) - except simplejson.JSONDecodeError: + except Exception as e: + logger.warn("Unable to parse JSON: %s", e) raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON) return content From 46790f50cfcc1049974b468d4b08402935e8ac84 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 10 Nov 2017 16:34:33 +0000 Subject: [PATCH 07/65] Cache failures in url_preview handler Reshuffle the caching logic in the url_preview handler so that failures are cached (and to generally simplify things and fix the logcontext leaks). --- synapse/rest/media/v1/preview_url_resource.py | 86 ++++++++++--------- 1 file changed, 45 insertions(+), 41 deletions(-) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 7907a9d17a..38e1afd34b 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -20,6 +20,7 @@ from twisted.web.resource import Resource from synapse.api.errors import ( SynapseError, Codes, ) +from synapse.util.logcontext import preserve_fn, make_deferred_yieldable from synapse.util.stringutils import random_string from synapse.util.caches.expiringcache import ExpiringCache from synapse.http.client import SpiderHttpClient @@ -63,16 +64,15 @@ class PreviewUrlResource(Resource): self.url_preview_url_blacklist = hs.config.url_preview_url_blacklist - # simple memory cache mapping urls to OG metadata - self.cache = ExpiringCache( + # memory cache mapping urls to an ObservableDeferred returning + # JSON-encoded OG metadata + self._cache = ExpiringCache( cache_name="url_previews", clock=self.clock, # don't spider URLs more often than once an hour expiry_ms=60 * 60 * 1000, ) - self.cache.start() - - self.downloads = {} + self._cache.start() self._cleaner_loop = self.clock.looping_call( self._expire_url_cache_data, 10 * 1000 @@ -94,6 +94,7 @@ class PreviewUrlResource(Resource): else: ts = self.clock.time_msec() + # XXX: we could move this into _do_preview if we wanted. url_tuple = urlparse.urlsplit(url) for entry in self.url_preview_url_blacklist: match = True @@ -126,14 +127,40 @@ class PreviewUrlResource(Resource): Codes.UNKNOWN ) - # first check the memory cache - good to handle all the clients on this - # HS thundering away to preview the same URL at the same time. - og = self.cache.get(url) - if og: - respond_with_json_bytes(request, 200, json.dumps(og), send_cors=True) - return + # the in-memory cache: + # * ensures that only one request is active at a time + # * takes load off the DB for the thundering herds + # * also caches any failures (unlike the DB) so we don't keep + # requesting the same endpoint - # then check the URL cache in the DB (which will also provide us with + observable = self._cache.get(url) + + if not observable: + download = preserve_fn(self._do_preview)( + url, requester.user, ts, + ) + observable = ObservableDeferred( + download, + consumeErrors=True + ) + self._cache[url] = observable + + og = yield make_deferred_yieldable(observable.observe()) + respond_with_json_bytes(request, 200, og, send_cors=True) + + @defer.inlineCallbacks + def _do_preview(self, url, user, ts): + """Check the db, and download the URL and build a preview + + Args: + url (str): + user (str): + ts (int): + + Returns: + Deferred[str]: json-encoded og data + """ + # check the URL cache in the DB (which will also provide us with # historical previews, if we have any) cache_result = yield self.store.get_url_cache(url, ts) if ( @@ -141,32 +168,10 @@ class PreviewUrlResource(Resource): cache_result["expires_ts"] > ts and cache_result["response_code"] / 100 == 2 ): - respond_with_json_bytes( - request, 200, cache_result["og"].encode('utf-8'), - send_cors=True - ) + defer.returnValue(cache_result["og"]) return - # Ensure only one download for a given URL is active at a time - download = self.downloads.get(url) - if download is None: - download = self._download_url(url, requester.user) - download = ObservableDeferred( - download, - consumeErrors=True - ) - self.downloads[url] = download - - @download.addBoth - def callback(media_info): - del self.downloads[url] - return media_info - media_info = yield download.observe() - - # FIXME: we should probably update our cache now anyway, so that - # even if the OG calculation raises, we don't keep hammering on the - # remote server. For now, leave it uncached to aid debugging OG - # calculation problems + media_info = yield self._download_url(url, user) logger.debug("got media_info of '%s'" % media_info) @@ -212,7 +217,7 @@ class PreviewUrlResource(Resource): # just rely on the caching on the master request to speed things up. if 'og:image' in og and og['og:image']: image_info = yield self._download_url( - _rebase_url(og['og:image'], media_info['uri']), requester.user + _rebase_url(og['og:image'], media_info['uri']), user ) if _is_media(image_info['media_type']): @@ -239,8 +244,7 @@ class PreviewUrlResource(Resource): logger.debug("Calculated OG for %s as %s" % (url, og)) - # store OG in ephemeral in-memory cache - self.cache[url] = og + jsonog = json.dumps(og) # store OG in history-aware DB cache yield self.store.store_url_cache( @@ -248,12 +252,12 @@ class PreviewUrlResource(Resource): media_info["response_code"], media_info["etag"], media_info["expires"] + media_info["created_ts"], - json.dumps(og), + jsonog, media_info["filesystem_id"], media_info["created_ts"], ) - respond_with_json_bytes(request, 200, json.dumps(og), send_cors=True) + defer.returnValue(jsonog) @defer.inlineCallbacks def _download_url(self, url, user): From 5d15abb120a483395804d7e500dfa0bc42e49d51 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 10 Nov 2017 16:58:04 +0000 Subject: [PATCH 08/65] Bit more logging --- synapse/rest/media/v1/preview_url_resource.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 38e1afd34b..723f7043f4 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -144,6 +144,8 @@ class PreviewUrlResource(Resource): consumeErrors=True ) self._cache[url] = observable + else: + logger.info("Returning cached response") og = yield make_deferred_yieldable(observable.observe()) respond_with_json_bytes(request, 200, og, send_cors=True) From 2d314b771f032441595f931210fde67d25b90075 Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Sun, 12 Nov 2017 23:30:23 -0700 Subject: [PATCH 09/65] Add a route for determining who you are Useful for applications which may have an access token, but no idea as to who owns it. Signed-off-by: Travis Ralston --- synapse/rest/client/v2_alpha/account.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 3062e04c59..0efbcb10d7 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -382,6 +382,22 @@ class ThreepidDeleteRestServlet(RestServlet): defer.returnValue((200, {})) +class WhoamiRestServlet(RestServlet): + PATTERNS = client_v2_patterns("/account/whoami$") + + def __init__(self, hs): + super(WhoamiRestServlet, self).__init__() + self.auth = hs.get_auth() + + @defer.inlineCallbacks + def on_GET(self, request): + yield run_on_reactor() + + requester = yield self.auth.get_user_by_req(request) + + defer.returnValue((200, {'user_id': requester.user.to_string()})) + + def register_servlets(hs, http_server): EmailPasswordRequestTokenRestServlet(hs).register(http_server) MsisdnPasswordRequestTokenRestServlet(hs).register(http_server) @@ -391,3 +407,4 @@ def register_servlets(hs, http_server): MsisdnThreepidRequestTokenRestServlet(hs).register(http_server) ThreepidRestServlet(hs).register(http_server) ThreepidDeleteRestServlet(hs).register(http_server) + WhoamiRestServlet(hs).register(http_server) From 6cfee09be9b5f58b83ef30bb35fa70453c7c2329 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 9 Nov 2017 18:51:27 +0000 Subject: [PATCH 10/65] Make __init__ consitstent across Store heirarchy Add db_conn parameters to the `__init__` methods of the *Store classes, so that they are all consistent, which makes the multiple inheritance work correctly (and so that we can later extract mixins which can be used in the slavedstores) --- synapse/replication/slave/storage/_base.py | 2 +- synapse/storage/__init__.py | 2 +- synapse/storage/_base.py | 2 +- synapse/storage/appservice.py | 8 ++++---- synapse/storage/background_updates.py | 4 ++-- synapse/storage/client_ips.py | 4 ++-- synapse/storage/deviceinbox.py | 4 ++-- synapse/storage/devices.py | 4 ++-- synapse/storage/event_federation.py | 4 ++-- synapse/storage/event_push_actions.py | 4 ++-- synapse/storage/events.py | 4 ++-- synapse/storage/receipts.py | 4 ++-- synapse/storage/registration.py | 4 ++-- synapse/storage/roommember.py | 4 ++-- synapse/storage/search.py | 4 ++-- synapse/storage/state.py | 4 ++-- synapse/storage/transactions.py | 4 ++-- 17 files changed, 33 insertions(+), 33 deletions(-) diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index b962641166..61f5590c53 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) class BaseSlavedStore(SQLBaseStore): def __init__(self, db_conn, hs): - super(BaseSlavedStore, self).__init__(hs) + super(BaseSlavedStore, self).__init__(db_conn, hs) if isinstance(self.database_engine, PostgresEngine): self._cache_id_gen = SlavedIdTracker( db_conn, "cache_invalidation_stream", "stream_id", diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 594566eb38..d01d46338a 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -268,7 +268,7 @@ class DataStore(RoomMemberStore, RoomStore, self._stream_order_on_start = self.get_room_max_stream_ordering() self._min_stream_order_on_start = self.get_room_min_stream_ordering() - super(DataStore, self).__init__(hs) + super(DataStore, self).__init__(db_conn, hs) def take_presence_startup_info(self): active_on_startup = self._presence_on_startup diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 6caf7b3356..e94917d9cd 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -162,7 +162,7 @@ class PerformanceCounters(object): class SQLBaseStore(object): _TXN_ID = 0 - def __init__(self, hs): + def __init__(self, db_conn, hs): self.hs = hs self._clock = hs.get_clock() self._db_pool = hs.get_db_pool() diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index c63935cb07..d8c84b7141 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -48,8 +48,8 @@ def _make_exclusive_regex(services_cache): class ApplicationServiceStore(SQLBaseStore): - def __init__(self, hs): - super(ApplicationServiceStore, self).__init__(hs) + def __init__(self, db_conn, hs): + super(ApplicationServiceStore, self).__init__(db_conn, hs) self.hostname = hs.hostname self.services_cache = load_appservices( hs.hostname, @@ -173,8 +173,8 @@ class ApplicationServiceStore(SQLBaseStore): class ApplicationServiceTransactionStore(SQLBaseStore): - def __init__(self, hs): - super(ApplicationServiceTransactionStore, self).__init__(hs) + def __init__(self, db_conn, hs): + super(ApplicationServiceTransactionStore, self).__init__(db_conn, hs) @defer.inlineCallbacks def get_appservices_by_state(self, state): diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index a6e6f52a6a..6f235ac051 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -80,8 +80,8 @@ class BackgroundUpdateStore(SQLBaseStore): BACKGROUND_UPDATE_INTERVAL_MS = 1000 BACKGROUND_UPDATE_DURATION_MS = 100 - def __init__(self, hs): - super(BackgroundUpdateStore, self).__init__(hs) + def __init__(self, db_conn, hs): + super(BackgroundUpdateStore, self).__init__(db_conn, hs) self._background_update_performance = {} self._background_update_queue = [] self._background_update_handlers = {} diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 3c95e90eca..a03d1d6104 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -32,14 +32,14 @@ LAST_SEEN_GRANULARITY = 120 * 1000 class ClientIpStore(background_updates.BackgroundUpdateStore): - def __init__(self, hs): + def __init__(self, db_conn, hs): self.client_ip_last_seen = Cache( name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR, ) - super(ClientIpStore, self).__init__(hs) + super(ClientIpStore, self).__init__(db_conn, hs) self.register_background_index_update( "user_ips_device_index", diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 0b62b493d5..548e795daf 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -29,8 +29,8 @@ logger = logging.getLogger(__name__) class DeviceInboxStore(BackgroundUpdateStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" - def __init__(self, hs): - super(DeviceInboxStore, self).__init__(hs) + def __init__(self, db_conn, hs): + super(DeviceInboxStore, self).__init__(db_conn, hs) self.register_background_index_update( "device_inbox_stream_index", diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index bb27fd1f70..bd2effdf34 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -26,8 +26,8 @@ logger = logging.getLogger(__name__) class DeviceStore(SQLBaseStore): - def __init__(self, hs): - super(DeviceStore, self).__init__(hs) + def __init__(self, db_conn, hs): + super(DeviceStore, self).__init__(db_conn, hs) # Map of (user_id, device_id) -> bool. If there is an entry that implies # the device exists. diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index e8133de2fa..55a05c59d5 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -39,8 +39,8 @@ class EventFederationStore(SQLBaseStore): EVENT_AUTH_STATE_ONLY = "event_auth_state_only" - def __init__(self, hs): - super(EventFederationStore, self).__init__(hs) + def __init__(self, db_conn, hs): + super(EventFederationStore, self).__init__(db_conn, hs) self.register_background_update_handler( self.EVENT_AUTH_STATE_ONLY, diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index d6d8723b4a..8efe2fd4bb 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -65,8 +65,8 @@ def _deserialize_action(actions, is_highlight): class EventPushActionsStore(SQLBaseStore): EPA_HIGHLIGHT_INDEX = "epa_highlight_index" - def __init__(self, hs): - super(EventPushActionsStore, self).__init__(hs) + def __init__(self, db_conn, hs): + super(EventPushActionsStore, self).__init__(db_conn, hs) self.register_background_index_update( self.EPA_HIGHLIGHT_INDEX, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4298d8baf1..d08f7571d7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -197,8 +197,8 @@ class EventsStore(SQLBaseStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" - def __init__(self, hs): - super(EventsStore, self).__init__(hs) + def __init__(self, db_conn, hs): + super(EventsStore, self).__init__(db_conn, hs) self._clock = hs.get_clock() self.register_background_update_handler( self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index f42b8014c7..12b3cc7f5f 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -27,8 +27,8 @@ logger = logging.getLogger(__name__) class ReceiptsStore(SQLBaseStore): - def __init__(self, hs): - super(ReceiptsStore, self).__init__(hs) + def __init__(self, db_conn, hs): + super(ReceiptsStore, self).__init__(db_conn, hs) self._receipts_stream_cache = StreamChangeCache( "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token() diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 71748de733..8b9544c209 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -24,8 +24,8 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks class RegistrationStore(background_updates.BackgroundUpdateStore): - def __init__(self, hs): - super(RegistrationStore, self).__init__(hs) + def __init__(self, db_conn, hs): + super(RegistrationStore, self).__init__(db_conn, hs) self.clock = hs.get_clock() diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 3fa8019eb7..3e77fd3901 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -49,8 +49,8 @@ _MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update" class RoomMemberStore(SQLBaseStore): - def __init__(self, hs): - super(RoomMemberStore, self).__init__(hs) + def __init__(self, db_conn, hs): + super(RoomMemberStore, self).__init__(db_conn, hs) self.register_background_update_handler( _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile ) diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 05d4ef586e..479b04c636 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -33,8 +33,8 @@ class SearchStore(BackgroundUpdateStore): EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist" - def __init__(self, hs): - super(SearchStore, self).__init__(hs) + def __init__(self, db_conn, hs): + super(SearchStore, self).__init__(db_conn, hs) self.register_background_update_handler( self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search ) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 5673e4aa96..dd01b68762 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -63,8 +63,8 @@ class StateStore(SQLBaseStore): STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index" CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx" - def __init__(self, hs): - super(StateStore, self).__init__(hs) + def __init__(self, db_conn, hs): + super(StateStore, self).__init__(db_conn, hs) self.register_background_update_handler( self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, self._background_deduplicate_state, diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 809fdd311f..8f61f7ffae 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -46,8 +46,8 @@ class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ - def __init__(self, hs): - super(TransactionStore, self).__init__(hs) + def __init__(self, db_conn, hs): + super(TransactionStore, self).__init__(db_conn, hs) self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000) From 63ef607f1f6a9f998796cd3b6bcbcdb95fd08557 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 9 Nov 2017 20:53:11 +0000 Subject: [PATCH 11/65] Fix tests for Store.__init__ update Fix the test to pass the right number of args to the Store constructors --- tests/storage/test_appservice.py | 14 +++++++------- tests/storage/test_base.py | 2 +- tests/storage/test_directory.py | 2 +- tests/storage/test_presence.py | 2 +- tests/storage/test_profile.py | 2 +- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 79f569e787..13d81f972b 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -58,7 +58,7 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): self._add_appservice("token2", "as2", "some_url", "some_hs_token", "bob") self._add_appservice("token3", "as3", "some_url", "some_hs_token", "bob") # must be done after inserts - self.store = ApplicationServiceStore(hs) + self.store = ApplicationServiceStore(None, hs) def tearDown(self): # TODO: suboptimal that we need to create files for tests! @@ -150,7 +150,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): self.as_yaml_files = [] - self.store = TestTransactionStore(hs) + self.store = TestTransactionStore(None, hs) def _add_service(self, url, as_token, id): as_yaml = dict(url=url, as_token=as_token, hs_token="something", @@ -420,8 +420,8 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): class TestTransactionStore(ApplicationServiceTransactionStore, ApplicationServiceStore): - def __init__(self, hs): - super(TestTransactionStore, self).__init__(hs) + def __init__(self, db_conn, hs): + super(TestTransactionStore, self).__init__(db_conn, hs) class ApplicationServiceStoreConfigTestCase(unittest.TestCase): @@ -458,7 +458,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): replication_layer=Mock(), ) - ApplicationServiceStore(hs) + ApplicationServiceStore(None, hs) @defer.inlineCallbacks def test_duplicate_ids(self): @@ -477,7 +477,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): ) with self.assertRaises(ConfigError) as cm: - ApplicationServiceStore(hs) + ApplicationServiceStore(None, hs) e = cm.exception self.assertIn(f1, e.message) @@ -501,7 +501,7 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): ) with self.assertRaises(ConfigError) as cm: - ApplicationServiceStore(hs) + ApplicationServiceStore(None, hs) e = cm.exception self.assertIn(f1, e.message) diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index 91e971190c..0ac910e76f 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -56,7 +56,7 @@ class SQLBaseStoreTestCase(unittest.TestCase): database_engine=create_engine(config.database_config), ) - self.datastore = SQLBaseStore(hs) + self.datastore = SQLBaseStore(None, hs) @defer.inlineCallbacks def test_insert_1col(self): diff --git a/tests/storage/test_directory.py b/tests/storage/test_directory.py index b087892e0b..95709cd50a 100644 --- a/tests/storage/test_directory.py +++ b/tests/storage/test_directory.py @@ -29,7 +29,7 @@ class DirectoryStoreTestCase(unittest.TestCase): def setUp(self): hs = yield setup_test_homeserver() - self.store = DirectoryStore(hs) + self.store = DirectoryStore(None, hs) self.room = RoomID.from_string("!abcde:test") self.alias = RoomAlias.from_string("#my-room:test") diff --git a/tests/storage/test_presence.py b/tests/storage/test_presence.py index 63203cea35..f5fcb611d4 100644 --- a/tests/storage/test_presence.py +++ b/tests/storage/test_presence.py @@ -29,7 +29,7 @@ class PresenceStoreTestCase(unittest.TestCase): def setUp(self): hs = yield setup_test_homeserver(clock=MockClock()) - self.store = PresenceStore(hs) + self.store = PresenceStore(None, hs) self.u_apple = UserID.from_string("@apple:test") self.u_banana = UserID.from_string("@banana:test") diff --git a/tests/storage/test_profile.py b/tests/storage/test_profile.py index 24118bbc86..423710c9c1 100644 --- a/tests/storage/test_profile.py +++ b/tests/storage/test_profile.py @@ -29,7 +29,7 @@ class ProfileStoreTestCase(unittest.TestCase): def setUp(self): hs = yield setup_test_homeserver() - self.store = ProfileStore(hs) + self.store = ProfileStore(None, hs) self.u_frank = UserID.from_string("@frank:test") From 45ab288e072341447cc6375f37df8bace3d1c525 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 13 Nov 2017 18:32:08 +0000 Subject: [PATCH 12/65] Print instead of logging because we had to wait until the logger was set up --- synapse/config/push.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/synapse/config/push.py b/synapse/config/push.py index 8fc1b98eba..b7e0d46afa 100644 --- a/synapse/config/push.py +++ b/synapse/config/push.py @@ -16,13 +16,6 @@ from ._base import Config -import logging - -from twisted.internet import reactor - - -logger = logging.getLogger(__name__) - class PushConfig(Config): def read_config(self, config): @@ -33,20 +26,20 @@ class PushConfig(Config): # 'email'section'. Check for the flag in the 'push' section, and log, # but do not honour it to avoid nasty surprises when people upgrade. if push_config.get("redact_content") is not None: - reactor.callWhenRunning(lambda: logger.warn( + print( "The push.redact_content content option has never worked. " "Please set push.include_content if you want this behaviour" - )) + ) # Now check for the one in the 'email' section and honour it, # with a warning. push_config = config.get("email", {}) redact_content = push_config.get("redact_content") if redact_content is not None: - reactor.callWhenRunning(lambda: logger.warn( + print( "The 'email.redact_content' option is deprecated: " "please set push.include_content instead" - )) + ) self.push_include_content = not redact_content def default_config(self, config_dir_path, server_name, **kwargs): From 812c1919392c8ae8aa93969fb0679bd03d73da05 Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Mon, 13 Nov 2017 12:44:21 -0700 Subject: [PATCH 13/65] Remove redundent call Signed-off-by: Travis Ralston --- synapse/rest/client/v2_alpha/account.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 0efbcb10d7..726e0a2826 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -391,8 +391,6 @@ class WhoamiRestServlet(RestServlet): @defer.inlineCallbacks def on_GET(self, request): - yield run_on_reactor() - requester = yield self.auth.get_user_by_req(request) defer.returnValue((200, {'user_id': requester.user.to_string()})) From 1fc66c7460b7e6c503dbeb6577fb0ba3cf7dfd83 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 14 Nov 2017 09:23:56 +0000 Subject: [PATCH 14/65] Add a load of logging to the room_list handler So we can see what it gets up to. --- synapse/handlers/room_list.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 2cf34e51cb..928ee38aea 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -154,6 +154,8 @@ class RoomListHandler(BaseHandler): # We want larger rooms to be first, hence negating num_joined_users rooms_to_order_value[room_id] = (-num_joined_users, room_id) + logger.info("Getting ordering for %i rooms since %s", + len(room_ids), stream_token) yield concurrently_execute(get_order_for_room, room_ids, 10) sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1]) @@ -181,17 +183,25 @@ class RoomListHandler(BaseHandler): rooms_to_scan = rooms_to_scan[:since_token.current_limit] rooms_to_scan.reverse() + logger.info("After sorting and filtering, %i rooms remain", + len(rooms_to_scan)) + # Actually generate the entries. _append_room_entry_to_chunk will append to # chunk but will stop if len(chunk) > limit chunk = [] if limit and not search_filter: step = limit + 1 for i in xrange(0, len(rooms_to_scan), step): + logger.info("Processing %i rooms for result", step) # We iterate here because the vast majority of cases we'll stop # at first iteration, but occaisonally _append_room_entry_to_chunk # won't append to the chunk and so we need to loop again. # We don't want to scan over the entire range either as that # would potentially waste a lot of work. + # + # XXX why would that happen? _append_room_entry_to_chunk will + # only exclude rooms which don't match search_filter, but we + # know search_filter is None here. yield concurrently_execute( lambda r: self._append_room_entry_to_chunk( r, rooms_to_num_joined[r], @@ -199,9 +209,11 @@ class RoomListHandler(BaseHandler): ), rooms_to_scan[i:i + step], 10 ) + logger.info("Now %i rooms in result", len(chunk)) if len(chunk) >= limit + 1: break else: + logger.info("Processing %i rooms for result", len(rooms_to_scan)) yield concurrently_execute( lambda r: self._append_room_entry_to_chunk( r, rooms_to_num_joined[r], @@ -209,6 +221,7 @@ class RoomListHandler(BaseHandler): ), rooms_to_scan, 5 ) + logger.info("Now %i rooms in result", len(chunk)) chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"])) From 44a1bfd6a6a1cda272677c9ea8704957bc940509 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 14 Nov 2017 09:39:54 +0000 Subject: [PATCH 15/65] Reshuffle room list request code I'm not entirely sure if this will actually help anything, but it simplifies the code and might give further clues about why room list search requests are blowing out the get_current_state_ids caches. --- synapse/handlers/room_list.py | 51 +++++++++++++++++------------------ 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 928ee38aea..bb40075387 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -186,42 +186,39 @@ class RoomListHandler(BaseHandler): logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan)) - # Actually generate the entries. _append_room_entry_to_chunk will append to - # chunk but will stop if len(chunk) > limit - chunk = [] - if limit and not search_filter: + # _append_room_entry_to_chunk will append to chunk but will stop if + # len(chunk) > limit + # + # Normally we will generate enough results on the first iteration here, + # but if there is a search filter, _append_room_entry_to_chunk may + # filter some results out, in which case we loop again. + # + # We don't want to scan over the entire range either as that + # would potentially waste a lot of work. + # + # XXX if there is no limit, we may end up DoSing the server with + # calls to get_current_state_ids for every single room on the + # server. Surely we should cap this somehow? + # + if limit: step = limit + 1 - for i in xrange(0, len(rooms_to_scan), step): - logger.info("Processing %i rooms for result", step) - # We iterate here because the vast majority of cases we'll stop - # at first iteration, but occaisonally _append_room_entry_to_chunk - # won't append to the chunk and so we need to loop again. - # We don't want to scan over the entire range either as that - # would potentially waste a lot of work. - # - # XXX why would that happen? _append_room_entry_to_chunk will - # only exclude rooms which don't match search_filter, but we - # know search_filter is None here. - yield concurrently_execute( - lambda r: self._append_room_entry_to_chunk( - r, rooms_to_num_joined[r], - chunk, limit, search_filter - ), - rooms_to_scan[i:i + step], 10 - ) - logger.info("Now %i rooms in result", len(chunk)) - if len(chunk) >= limit + 1: - break else: - logger.info("Processing %i rooms for result", len(rooms_to_scan)) + step = len(rooms_to_scan) + + chunk = [] + for i in xrange(0, len(rooms_to_scan), step): + batch = rooms_to_scan[i:i + step] + logger.info("Processing %i rooms for result", len(batch)) yield concurrently_execute( lambda r: self._append_room_entry_to_chunk( r, rooms_to_num_joined[r], chunk, limit, search_filter ), - rooms_to_scan, 5 + batch, 5, ) logger.info("Now %i rooms in result", len(chunk)) + if len(chunk) >= limit + 1: + break chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"])) From 7e6fa29cb5ba1abd8b4f3873b0ef171c7c8aba26 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 14 Nov 2017 11:22:42 +0000 Subject: [PATCH 16/65] Remove preserve_context_over_{fn, deferred} Both of these functions ae known to leak logcontexts. Replace the remaining calls to them and kill them off. --- docs/log_contexts.rst | 4 ---- synapse/federation/federation_client.py | 4 ++-- synapse/handlers/appservice.py | 4 ++-- synapse/handlers/initial_sync.py | 4 ++-- synapse/push/pusherpool.py | 6 ++--- synapse/storage/stream.py | 4 ++-- synapse/util/async.py | 6 ++--- synapse/util/distributor.py | 24 +++++++------------ synapse/util/logcontext.py | 31 ------------------------- synapse/visibility.py | 4 ++-- 10 files changed, 24 insertions(+), 67 deletions(-) diff --git a/docs/log_contexts.rst b/docs/log_contexts.rst index eb1784e700..b19b7fa1ea 100644 --- a/docs/log_contexts.rst +++ b/docs/log_contexts.rst @@ -298,10 +298,6 @@ It can be used like this: # this will now be logged against the request context logger.debug("Request handling complete") -XXX: I think ``preserve_context_over_fn`` is supposed to do the first option, -but the fact that it does ``preserve_context_over_deferred`` on its results -means that its use is fraught with difficulty. - Passing synapse deferreds into third-party functions ---------------------------------------------------- diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 7c5e5d957f..b8f02f5391 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -25,7 +25,7 @@ from synapse.api.errors import ( from synapse.util import unwrapFirstError, logcontext from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.events import FrozenEvent, builder import synapse.metrics @@ -420,7 +420,7 @@ class FederationClient(FederationBase): for e_id in batch ] - res = yield preserve_context_over_deferred( + res = yield make_deferred_yieldable( defer.DeferredList(deferreds, consumeErrors=True) ) for success, result in res: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 543bf28aec..feca3e4c10 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.util.metrics import Measure -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn import logging @@ -159,7 +159,7 @@ class ApplicationServicesHandler(object): def query_3pe(self, kind, protocol, fields): services = yield self._get_services_for_3pn(protocol) - results = yield preserve_context_over_deferred(defer.DeferredList([ + results = yield make_deferred_yieldable(defer.DeferredList([ preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields) for service in services ], consumeErrors=True)) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 9718d4abc5..c5267b4b84 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -27,7 +27,7 @@ from synapse.types import ( from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -163,7 +163,7 @@ class InitialSyncHandler(BaseHandler): lambda states: states[event.event_id] ) - (messages, token), current_state = yield preserve_context_over_deferred( + (messages, token), current_state = yield make_deferred_yieldable( defer.gatherResults( [ preserve_fn(self.store.get_recent_events_for_room)( diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 7c069b662e..34cb108dcb 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -17,7 +17,7 @@ from twisted.internet import defer from .pusher import PusherFactory -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.util.async import run_on_reactor import logging @@ -136,7 +136,7 @@ class PusherPool: ) ) - yield preserve_context_over_deferred(defer.gatherResults(deferreds)) + yield make_deferred_yieldable(defer.gatherResults(deferreds)) except Exception: logger.exception("Exception in pusher on_new_notifications") @@ -161,7 +161,7 @@ class PusherPool: preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) ) - yield preserve_context_over_deferred(defer.gatherResults(deferreds)) + yield make_deferred_yieldable(defer.gatherResults(deferreds)) except Exception: logger.exception("Exception in pusher on_new_receipts") diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index dddd5fc0e7..52bdce5be2 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -39,7 +39,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging @@ -234,7 +234,7 @@ class StreamStore(SQLBaseStore): results = {} room_ids = list(room_ids) for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)): - res = yield preserve_context_over_deferred(defer.gatherResults([ + res = yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(self.get_room_events_stream_for_room)( room_id, from_key, to_key, limit, order=order, ) diff --git a/synapse/util/async.py b/synapse/util/async.py index e786fb38a9..0729bb2863 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -17,7 +17,7 @@ from twisted.internet import defer, reactor from .logcontext import ( - PreserveLoggingContext, preserve_fn, preserve_context_over_deferred, + PreserveLoggingContext, make_deferred_yieldable, preserve_fn ) from synapse.util import logcontext, unwrapFirstError @@ -351,7 +351,7 @@ class ReadWriteLock(object): # We wait for the latest writer to finish writing. We can safely ignore # any existing readers... as they're readers. - yield curr_writer + yield make_deferred_yieldable(curr_writer) @contextmanager def _ctx_manager(): @@ -380,7 +380,7 @@ class ReadWriteLock(object): curr_readers.clear() self.key_to_current_writer[key] = new_defer - yield preserve_context_over_deferred(defer.gatherResults(to_wait_on)) + yield make_deferred_yieldable(defer.gatherResults(to_wait_on)) @contextmanager def _ctx_manager(): diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index e68f94ce77..734331caaa 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -13,32 +13,24 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer - -from synapse.util.logcontext import ( - PreserveLoggingContext, preserve_context_over_fn -) - -from synapse.util import unwrapFirstError - import logging +from twisted.internet import defer + +from synapse.util import unwrapFirstError +from synapse.util.logcontext import PreserveLoggingContext logger = logging.getLogger(__name__) def user_left_room(distributor, user, room_id): - return preserve_context_over_fn( - distributor.fire, - "user_left_room", user=user, room_id=room_id - ) + with PreserveLoggingContext(): + distributor.fire("user_left_room", user=user, room_id=room_id) def user_joined_room(distributor, user, room_id): - return preserve_context_over_fn( - distributor.fire, - "user_joined_room", user=user, room_id=room_id - ) + with PreserveLoggingContext(): + distributor.fire("user_joined_room", user=user, room_id=room_id) class Distributor(object): diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 9683cc7265..92b9413a35 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -291,37 +291,6 @@ class _PreservingContextDeferred(defer.Deferred): return g -def preserve_context_over_fn(fn, *args, **kwargs): - """Takes a function and invokes it with the given arguments, but removes - and restores the current logging context while doing so. - - If the result is a deferred, call preserve_context_over_deferred before - returning it. - """ - with PreserveLoggingContext(): - res = fn(*args, **kwargs) - - if isinstance(res, defer.Deferred): - return preserve_context_over_deferred(res) - else: - return res - - -def preserve_context_over_deferred(deferred, context=None): - """Given a deferred wrap it such that any callbacks added later to it will - be invoked with the current context. - - Deprecated: this almost certainly doesn't do want you want, ie make - the deferred follow the synapse logcontext rules: try - ``make_deferred_yieldable`` instead. - """ - if context is None: - context = LoggingContext.current_context() - d = _PreservingContextDeferred(context) - deferred.chainDeferred(d) - return d - - def preserve_fn(f): """Wraps a function, to ensure that the current context is restored after return from the function, and that the sentinel context is set once the diff --git a/synapse/visibility.py b/synapse/visibility.py index d7dbdc77ff..aaca2c584c 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -17,7 +17,7 @@ from twisted.internet import defer from synapse.api.constants import Membership, EventTypes -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn import logging @@ -58,7 +58,7 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state, always_include_ids (set(event_id)): set of event ids to specifically include (unless sender is ignored) """ - forgotten = yield preserve_context_over_deferred(defer.gatherResults([ + forgotten = yield make_deferred_yieldable(defer.gatherResults([ defer.maybeDeferred( preserve_fn(store.who_forgot_in_room), room_id, From 4dd1bfa8c18a3dc9df934a61771ae1e85b313b7e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 13 Nov 2017 10:30:38 +0000 Subject: [PATCH 17/65] Revert "Revert "move _state_group_cache to statestore"" We're going to fix this properly on this branch, so that the _state_group_cache can end up in StateGroupReadStore. This reverts commit ab335edb023d66cd0be439e045b10ca104b73cb5. --- synapse/storage/_base.py | 6 ------ synapse/storage/state.py | 19 ++++++++++++------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e94917d9cd..7ebd4f189d 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -16,8 +16,6 @@ import logging from synapse.api.errors import StoreError from synapse.util.logcontext import LoggingContext, PreserveLoggingContext -from synapse.util.caches import CACHE_SIZE_FACTOR -from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache from synapse.storage.engines import PostgresEngine import synapse.metrics @@ -180,10 +178,6 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, max_entries=hs.config.event_cache_size) - self._state_group_cache = DictionaryCache( - "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR - ) - self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 diff --git a/synapse/storage/state.py b/synapse/storage/state.py index dd01b68762..ee3496123e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -13,16 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cached, cachedList -from synapse.util.caches import intern_string -from synapse.util.stringutils import to_ascii -from synapse.storage.engines import PostgresEngine +from collections import namedtuple +import logging from twisted.internet import defer -from collections import namedtuple -import logging +from synapse.storage.engines import PostgresEngine +from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR +from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.caches.dictionary_cache import DictionaryCache +from synapse.util.stringutils import to_ascii +from ._base import SQLBaseStore logger = logging.getLogger(__name__) @@ -81,6 +82,10 @@ class StateStore(SQLBaseStore): where_clause="type='m.room.member'", ) + self._state_group_cache = DictionaryCache( + "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR + ) + @cached(max_entries=100000, iterable=True) def get_current_state_ids(self, room_id): """Get the current state event ids for a room based on the From 35a4b632405be2ca91039f63a8c9c550f0f44ea3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 9 Nov 2017 19:00:20 +0000 Subject: [PATCH 18/65] Pull out bits of StateStore to a mixin ... so that we don't need to secretly gut-wrench it for use in the slaved stores. I haven't done the other stores yet, but we should. I'm tired of the workers breaking every time we tweak the stores because I forgot to gut-wrench the right method. fixes https://github.com/matrix-org/synapse/issues/2655. --- synapse/replication/slave/storage/events.py | 39 +- synapse/storage/state.py | 424 ++++++++++---------- 2 files changed, 226 insertions(+), 237 deletions(-) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 94ebbffc1b..29d7296b43 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -12,20 +12,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from ._base import BaseSlavedStore -from ._slaved_id_tracker import SlavedIdTracker +import logging from synapse.api.constants import EventTypes from synapse.storage import DataStore -from synapse.storage.roommember import RoomMemberStore from synapse.storage.event_federation import EventFederationStore from synapse.storage.event_push_actions import EventPushActionsStore -from synapse.storage.state import StateStore +from synapse.storage.roommember import RoomMemberStore +from synapse.storage.state import StateGroupReadStore from synapse.storage.stream import StreamStore from synapse.util.caches.stream_change_cache import StreamChangeCache - -import logging - +from ._base import BaseSlavedStore +from ._slaved_id_tracker import SlavedIdTracker logger = logging.getLogger(__name__) @@ -39,7 +37,7 @@ logger = logging.getLogger(__name__) # the method descriptor on the DataStore and chuck them into our class. -class SlavedEventStore(BaseSlavedStore): +class SlavedEventStore(StateGroupReadStore, BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedEventStore, self).__init__(db_conn, hs) @@ -90,25 +88,9 @@ class SlavedEventStore(BaseSlavedStore): _get_unread_counts_by_pos_txn = ( DataStore._get_unread_counts_by_pos_txn.__func__ ) - _get_state_group_for_events = ( - StateStore.__dict__["_get_state_group_for_events"] - ) - _get_state_group_for_event = ( - StateStore.__dict__["_get_state_group_for_event"] - ) - _get_state_groups_from_groups = ( - StateStore.__dict__["_get_state_groups_from_groups"] - ) - _get_state_groups_from_groups_txn = ( - DataStore._get_state_groups_from_groups_txn.__func__ - ) get_recent_event_ids_for_room = ( StreamStore.__dict__["get_recent_event_ids_for_room"] ) - get_current_state_ids = ( - StateStore.__dict__["get_current_state_ids"] - ) - get_state_group_delta = StateStore.__dict__["get_state_group_delta"] _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"] has_room_changed_since = DataStore.has_room_changed_since.__func__ @@ -134,12 +116,6 @@ class SlavedEventStore(BaseSlavedStore): DataStore.get_room_events_stream_for_room.__func__ ) get_events_around = DataStore.get_events_around.__func__ - get_state_for_event = DataStore.get_state_for_event.__func__ - get_state_for_events = DataStore.get_state_for_events.__func__ - get_state_groups = DataStore.get_state_groups.__func__ - get_state_groups_ids = DataStore.get_state_groups_ids.__func__ - get_state_ids_for_event = DataStore.get_state_ids_for_event.__func__ - get_state_ids_for_events = DataStore.get_state_ids_for_events.__func__ get_joined_users_from_state = DataStore.get_joined_users_from_state.__func__ get_joined_users_from_context = DataStore.get_joined_users_from_context.__func__ _get_joined_users_from_context = ( @@ -169,10 +145,7 @@ class SlavedEventStore(BaseSlavedStore): _get_rooms_for_user_where_membership_is_txn = ( DataStore._get_rooms_for_user_where_membership_is_txn.__func__ ) - _get_state_for_groups = DataStore._get_state_for_groups.__func__ - _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__ _get_events_around_txn = DataStore._get_events_around_txn.__func__ - _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__ get_backfill_events = DataStore.get_backfill_events.__func__ _get_backfill_events = DataStore._get_backfill_events.__func__ diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ee3496123e..360e3e4355 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -18,6 +18,7 @@ import logging from twisted.internet import defer +from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.engines import PostgresEngine from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR from synapse.util.caches.descriptors import cached, cachedList @@ -41,23 +42,11 @@ class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delt return len(self.delta_ids) if self.delta_ids else 0 -class StateStore(SQLBaseStore): - """ Keeps track of the state at a given event. +class StateGroupReadStore(SQLBaseStore): + """The read-only parts of StateGroupStore - This is done by the concept of `state groups`. Every event is a assigned - a state group (identified by an arbitrary string), which references a - collection of state events. The current state of an event is then the - collection of state events referenced by the event's state group. - - Hence, every change in the current state causes a new state group to be - generated. However, if no change happens (e.g., if we get a message event - with only one parent it inherits the state group from its parent.) - - There are three tables: - * `state_groups`: Stores group name, first event with in the group and - room id. - * `event_to_state_groups`: Maps events to state groups. - * `state_groups_state`: Maps state group to state events. + None of these functions write to the state tables, so are suitable for + including in the SlavedStores. """ STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" @@ -65,22 +54,7 @@ class StateStore(SQLBaseStore): CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx" def __init__(self, db_conn, hs): - super(StateStore, self).__init__(db_conn, hs) - self.register_background_update_handler( - self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, - self._background_deduplicate_state, - ) - self.register_background_update_handler( - self.STATE_GROUP_INDEX_UPDATE_NAME, - self._background_index_state, - ) - self.register_background_index_update( - self.CURRENT_STATE_INDEX_UPDATE_NAME, - index_name="current_state_events_member_index", - table="current_state_events", - columns=["state_key"], - where_clause="type='m.room.member'", - ) + super(StateGroupReadStore, self).__init__(db_conn, hs) self._state_group_cache = DictionaryCache( "*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR @@ -195,178 +169,6 @@ class StateStore(SQLBaseStore): for group, event_id_map in group_to_ids.iteritems() }) - def _have_persisted_state_group_txn(self, txn, state_group): - txn.execute( - "SELECT count(*) FROM state_groups WHERE id = ?", - (state_group,) - ) - row = txn.fetchone() - return row and row[0] - - def _store_mult_state_groups_txn(self, txn, events_and_contexts): - state_groups = {} - for event, context in events_and_contexts: - if event.internal_metadata.is_outlier(): - continue - - if context.current_state_ids is None: - # AFAIK, this can never happen - logger.error( - "Non-outlier event %s had current_state_ids==None", - event.event_id) - continue - - # if the event was rejected, just give it the same state as its - # predecessor. - if context.rejected: - state_groups[event.event_id] = context.prev_group - continue - - state_groups[event.event_id] = context.state_group - - if self._have_persisted_state_group_txn(txn, context.state_group): - continue - - self._simple_insert_txn( - txn, - table="state_groups", - values={ - "id": context.state_group, - "room_id": event.room_id, - "event_id": event.event_id, - }, - ) - - # We persist as a delta if we can, while also ensuring the chain - # of deltas isn't tooo long, as otherwise read performance degrades. - if context.prev_group: - is_in_db = self._simple_select_one_onecol_txn( - txn, - table="state_groups", - keyvalues={"id": context.prev_group}, - retcol="id", - allow_none=True, - ) - if not is_in_db: - raise Exception( - "Trying to persist state with unpersisted prev_group: %r" - % (context.prev_group,) - ) - - potential_hops = self._count_state_group_hops_txn( - txn, context.prev_group - ) - if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS: - self._simple_insert_txn( - txn, - table="state_group_edges", - values={ - "state_group": context.state_group, - "prev_state_group": context.prev_group, - }, - ) - - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { - "state_group": context.state_group, - "room_id": event.room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in context.delta_ids.iteritems() - ], - ) - else: - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { - "state_group": context.state_group, - "room_id": event.room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in context.current_state_ids.iteritems() - ], - ) - - # Prefill the state group cache with this group. - # It's fine to use the sequence like this as the state group map - # is immutable. (If the map wasn't immutable then this prefill could - # race with another update) - txn.call_after( - self._state_group_cache.update, - self._state_group_cache.sequence, - key=context.state_group, - value=dict(context.current_state_ids), - full=True, - ) - - self._simple_insert_many_txn( - txn, - table="event_to_state_groups", - values=[ - { - "state_group": state_group_id, - "event_id": event_id, - } - for event_id, state_group_id in state_groups.iteritems() - ], - ) - - for event_id, state_group_id in state_groups.iteritems(): - txn.call_after( - self._get_state_group_for_event.prefill, - (event_id,), state_group_id - ) - - def _count_state_group_hops_txn(self, txn, state_group): - """Given a state group, count how many hops there are in the tree. - - This is used to ensure the delta chains don't get too long. - """ - if isinstance(self.database_engine, PostgresEngine): - sql = (""" - WITH RECURSIVE state(state_group) AS ( - VALUES(?::bigint) - UNION ALL - SELECT prev_state_group FROM state_group_edges e, state s - WHERE s.state_group = e.state_group - ) - SELECT count(*) FROM state; - """) - - txn.execute(sql, (state_group,)) - row = txn.fetchone() - if row and row[0]: - return row[0] - else: - return 0 - else: - # We don't use WITH RECURSIVE on sqlite3 as there are distributions - # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) - next_group = state_group - count = 0 - - while next_group: - next_group = self._simple_select_one_onecol_txn( - txn, - table="state_group_edges", - keyvalues={"state_group": next_group}, - retcol="prev_state_group", - allow_none=True, - ) - if next_group: - count += 1 - - return count - @defer.inlineCallbacks def _get_state_groups_from_groups(self, groups, types): """Returns dictionary state_group -> (dict of (type, state_key) -> event id) @@ -747,6 +549,220 @@ class StateStore(SQLBaseStore): defer.returnValue(results) + +class StateStore(StateGroupReadStore, BackgroundUpdateStore): + """ Keeps track of the state at a given event. + + This is done by the concept of `state groups`. Every event is a assigned + a state group (identified by an arbitrary string), which references a + collection of state events. The current state of an event is then the + collection of state events referenced by the event's state group. + + Hence, every change in the current state causes a new state group to be + generated. However, if no change happens (e.g., if we get a message event + with only one parent it inherits the state group from its parent.) + + There are three tables: + * `state_groups`: Stores group name, first event with in the group and + room id. + * `event_to_state_groups`: Maps events to state groups. + * `state_groups_state`: Maps state group to state events. + """ + + STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" + STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index" + CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx" + + def __init__(self, db_conn, hs): + super(StateStore, self).__init__(db_conn, hs) + self.register_background_update_handler( + self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, + self._background_deduplicate_state, + ) + self.register_background_update_handler( + self.STATE_GROUP_INDEX_UPDATE_NAME, + self._background_index_state, + ) + self.register_background_index_update( + self.CURRENT_STATE_INDEX_UPDATE_NAME, + index_name="current_state_events_member_index", + table="current_state_events", + columns=["state_key"], + where_clause="type='m.room.member'", + ) + + def _have_persisted_state_group_txn(self, txn, state_group): + txn.execute( + "SELECT count(*) FROM state_groups WHERE id = ?", + (state_group,) + ) + row = txn.fetchone() + return row and row[0] + + def _store_mult_state_groups_txn(self, txn, events_and_contexts): + state_groups = {} + for event, context in events_and_contexts: + if event.internal_metadata.is_outlier(): + continue + + if context.current_state_ids is None: + # AFAIK, this can never happen + logger.error( + "Non-outlier event %s had current_state_ids==None", + event.event_id) + continue + + # if the event was rejected, just give it the same state as its + # predecessor. + if context.rejected: + state_groups[event.event_id] = context.prev_group + continue + + state_groups[event.event_id] = context.state_group + + if self._have_persisted_state_group_txn(txn, context.state_group): + continue + + self._simple_insert_txn( + txn, + table="state_groups", + values={ + "id": context.state_group, + "room_id": event.room_id, + "event_id": event.event_id, + }, + ) + + # We persist as a delta if we can, while also ensuring the chain + # of deltas isn't tooo long, as otherwise read performance degrades. + if context.prev_group: + is_in_db = self._simple_select_one_onecol_txn( + txn, + table="state_groups", + keyvalues={"id": context.prev_group}, + retcol="id", + allow_none=True, + ) + if not is_in_db: + raise Exception( + "Trying to persist state with unpersisted prev_group: %r" + % (context.prev_group,) + ) + + potential_hops = self._count_state_group_hops_txn( + txn, context.prev_group + ) + if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS: + self._simple_insert_txn( + txn, + table="state_group_edges", + values={ + "state_group": context.state_group, + "prev_state_group": context.prev_group, + }, + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.delta_ids.iteritems() + ], + ) + else: + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.current_state_ids.iteritems() + ], + ) + + # Prefill the state group cache with this group. + # It's fine to use the sequence like this as the state group map + # is immutable. (If the map wasn't immutable then this prefill could + # race with another update) + txn.call_after( + self._state_group_cache.update, + self._state_group_cache.sequence, + key=context.state_group, + value=dict(context.current_state_ids), + full=True, + ) + + self._simple_insert_many_txn( + txn, + table="event_to_state_groups", + values=[ + { + "state_group": state_group_id, + "event_id": event_id, + } + for event_id, state_group_id in state_groups.iteritems() + ], + ) + + for event_id, state_group_id in state_groups.iteritems(): + txn.call_after( + self._get_state_group_for_event.prefill, + (event_id,), state_group_id + ) + + def _count_state_group_hops_txn(self, txn, state_group): + """Given a state group, count how many hops there are in the tree. + + This is used to ensure the delta chains don't get too long. + """ + if isinstance(self.database_engine, PostgresEngine): + sql = (""" + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT count(*) FROM state; + """) + + txn.execute(sql, (state_group,)) + row = txn.fetchone() + if row and row[0]: + return row[0] + else: + return 0 + else: + # We don't use WITH RECURSIVE on sqlite3 as there are distributions + # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) + next_group = state_group + count = 0 + + while next_group: + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + if next_group: + count += 1 + + return count + def get_next_state_group(self): return self._state_groups_id_gen.get_next() From 03feb7a34d0496bc3f9cc350e74fde4cd0c38a17 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 Nov 2017 14:51:25 +0000 Subject: [PATCH 19/65] Bump version and changelog --- CHANGES.rst | 50 +++++++++++++++++++++++++++++++++++++++++++++ synapse/__init__.py | 2 +- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index 4911cfa284..8e84323079 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,53 @@ +Changes in synapse v0.25.0-rc1 (2017-11-14) +=========================================== + +Features: + +* Add is_public to groups table to allow for private groups (PR #2582) +* Add a route for determining who you are (PR #2668) Thanks to @turt2live! +* Add more features to the password providers (PR #2608, #2610, #2620, #2622, + #2623, #2624, #2626, #2628, #2629) +* Add a hook for custom rest endpoints (PR #2627) +* Add API to update group room visibility (PR #2651) + + +Changes: + +* Ignore