From e644f49b46e4c7a01631b5c0a5756e34fb2e1fb3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 19 Nov 2018 15:09:07 +0000 Subject: [PATCH 1/6] Delta file --- synapse/storage/schema/delta/52/thread_id.sql | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 synapse/storage/schema/delta/52/thread_id.sql diff --git a/synapse/storage/schema/delta/52/thread_id.sql b/synapse/storage/schema/delta/52/thread_id.sql new file mode 100644 index 0000000000..845529fdcd --- /dev/null +++ b/synapse/storage/schema/delta/52/thread_id.sql @@ -0,0 +1,20 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE events ADD COLUMN thread_id BIGINT NOT NULL DEFAULT 0; + +CREATE INDEX events_room_idx ON events (room_id, thread_id); + +-- CREATE SEQUENCE thread_id_seq; From 775441105a673378de9fc66eb3476aedc7ee3353 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Nov 2018 11:30:43 +0000 Subject: [PATCH 2/6] Reduce timeouts for sending transaction --- synapse/federation/transport/client.py | 2 +- synapse/http/matrixfederationclient.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index edba5a9808..5e8bf8ac76 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -174,7 +174,7 @@ class TransportLayerClient(object): path=path, data=json_data, json_data_callback=json_data_callback, - long_retries=True, + long_retries=False, backoff_on_404=True, # If we get a 404 the other side has gone ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 24b6110c20..b54d4a7b89 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -196,7 +196,7 @@ class MatrixFederationHttpClient(object): self.clock = hs.get_clock() self._store = hs.get_datastore() self.version_string_bytes = hs.version_string.encode('ascii') - self.default_timeout = 60 + self.default_timeout = 30 def schedule(x): reactor.callLater(_EPSILON, x) From 607ac7ea373a543f652c6d680b4b3d6ec545298b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Nov 2018 13:32:47 +0000 Subject: [PATCH 3/6] Lower all the timeouts --- synapse/federation/transport/client.py | 1 + synapse/handlers/federation.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 5e8bf8ac76..42ed61470f 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -175,6 +175,7 @@ class TransportLayerClient(object): data=json_data, json_data_callback=json_data_callback, long_retries=False, + timeout=10000, backoff_on_404=True, # If we get a 404 the other side has gone ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c37a3b8dca..c9fefeb087 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -424,7 +424,7 @@ class FederationHandler(BaseHandler): ) now = self.clock.time_msec() - if now - pdu.origin_server_ts > 2 * 60 * 1000: + if now - pdu.origin_server_ts > 1 * 60 * 1000: pass else: thread_id = 0 From 115e4bb4c6807b73bcc81dcfcb29d6e435a48bc7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Nov 2018 17:04:19 +0000 Subject: [PATCH 4/6] Fix threading --- synapse/federation/federation_server.py | 20 ++++++++++++++++++-- synapse/handlers/federation.py | 3 ++- synapse/http/matrixfederationclient.py | 17 +++++++++-------- 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 98722ae543..9e064b2e57 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import random import six from six import iteritems @@ -71,6 +72,7 @@ class FederationServer(FederationBase): self.auth = hs.get_auth() self.handler = hs.get_handlers().federation_handler + self.clock = hs.get_clock() self._server_linearizer = Linearizer("fed_server") self._transaction_linearizer = Linearizer("fed_txn_handler") @@ -208,12 +210,25 @@ class FederationServer(FederationBase): pdu_results[event_id] = e.error_dict() return + thread_id = random.randint(0, 999999999) + pdu_to_thread = {} + first_in_thread = True + for pdu in reversed(pdus_by_room[room_id]): + now = self.clock.time_msec() + if now - pdu.origin_server_ts > 1 * 60 * 1000: + pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread) + first_in_thread = False + else: + pdu_to_thread[pdu.event_id] = (0, False) + for pdu in pdus_by_room[room_id]: event_id = pdu.event_id with nested_logging_context(event_id): + thread_id, new_thread = pdu_to_thread[pdu.event_id] try: yield self._handle_received_pdu( - origin, pdu + origin, pdu, thread_id=thread_id, + new_thread=new_thread ) pdu_results[event_id] = {} except FederationError as e: @@ -571,7 +586,7 @@ class FederationServer(FederationBase): ) @defer.inlineCallbacks - def _handle_received_pdu(self, origin, pdu): + def _handle_received_pdu(self, origin, pdu, thread_id, new_thread): """ Process a PDU received in a federation /send/ transaction. If the event is invalid, then this method throws a FederationError. @@ -638,6 +653,7 @@ class FederationServer(FederationBase): yield self.handler.on_receive_pdu( origin, pdu, sent_to_us_directly=True, + thread_id=thread_id, new_thread=new_thread, ) def __str__(self): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c9fefeb087..4b37093a82 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -138,6 +138,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def on_receive_pdu( self, origin, pdu, sent_to_us_directly=False, thread_id=None, + new_thread=False, ): """ Process a PDU received via a federation /send/ transaction, or via backfill of missing prev_events @@ -461,7 +462,7 @@ class FederationHandler(BaseHandler): create_requester(UserID("server", "server")), event, context, - ratelimit=True, + ratelimit=False, extra_users=[], do_auth=False, ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index b54d4a7b89..d2ca39c71e 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -253,13 +253,13 @@ class MatrixFederationHttpClient(object): ): raise FederationDeniedError(request.destination) - limiter = yield synapse.util.retryutils.get_retry_limiter( - request.destination, - self.clock, - self._store, - backoff_on_404=backoff_on_404, - ignore_backoff=ignore_backoff, - ) + # limiter = yield synapse.util.retryutils.get_retry_limiter( + # request.destination, + # self.clock, + # self._store, + # backoff_on_404=backoff_on_404, + # ignore_backoff=ignore_backoff, + # ) method_bytes = request.method.encode("ascii") destination_bytes = request.destination.encode("ascii") @@ -274,7 +274,8 @@ class MatrixFederationHttpClient(object): b"Host": [destination_bytes], } - with limiter: + # with limiter: + if True: # XXX: Would be much nicer to retry only at the transaction-layer # (once we have reliable transactions in place) if long_retries: From 5ae1644d3d16baf55e7149ed9507b8254461c3e2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Nov 2018 17:42:43 +0000 Subject: [PATCH 5/6] Send down new thread marker --- synapse/handlers/federation.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 4b37093a82..68e13673fd 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -225,12 +225,6 @@ class FederationHandler(BaseHandler): state = None auth_chain = [] - new_thread = False - if thread_id is None: - # FIXME: Pick something better? - thread_id = random.randint(0, 999999999) - new_thread = True - # Get missing pdus if necessary. if not pdu.internal_metadata.is_outlier(): # We only backfill backwards to the min depth. @@ -424,13 +418,6 @@ class FederationHandler(BaseHandler): affected=event_id, ) - now = self.clock.time_msec() - if now - pdu.origin_server_ts > 1 * 60 * 1000: - pass - else: - thread_id = 0 - new_thread = False - logger.info("Thread ID %r", thread_id) yield self._process_received_pdu( From d0d3c63705d72c051cb3e03ec7f717d5a6051179 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Nov 2018 10:45:35 +0000 Subject: [PATCH 6/6] Fix threading when pulling in via get_missing_events --- synapse/federation/federation_server.py | 3 ++- synapse/handlers/federation.py | 18 +++++++++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9e064b2e57..605597acab 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -210,7 +210,7 @@ class FederationServer(FederationBase): pdu_results[event_id] = e.error_dict() return - thread_id = random.randint(0, 999999999) + thread_id = random.randint(1, 999999999) pdu_to_thread = {} first_in_thread = True for pdu in reversed(pdus_by_room[room_id]): @@ -225,6 +225,7 @@ class FederationServer(FederationBase): event_id = pdu.event_id with nested_logging_context(event_id): thread_id, new_thread = pdu_to_thread[pdu.event_id] + logger.info("Assigning thread %d to %s", thread_id, pdu.event_id) try: yield self._handle_received_pdu( origin, pdu, thread_id=thread_id, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 68e13673fd..9e470f8614 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -551,6 +551,21 @@ class FederationHandler(BaseHandler): # tell clients about them in order. missing_events.sort(key=lambda x: x.depth) + pdu_to_thread = {} + if not thread_id: + thread_id = random.randint(1, 999999999) + first_in_thread = True + for pdu in reversed(missing_events): + now = self.clock.time_msec() + if now - pdu.origin_server_ts > 1 * 60 * 1000: + pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread) + first_in_thread = False + else: + pdu_to_thread[pdu.event_id] = (0, False) + else: + for pdu in reversed(missing_events): + pdu_to_thread[pdu.event_id] = (thread_id, False) + for ev in missing_events: logger.info( "[%s %s] Handling received prev_event %s", @@ -562,7 +577,8 @@ class FederationHandler(BaseHandler): origin, ev, sent_to_us_directly=False, - thread_id=thread_id, + thread_id=pdu_to_thread[ev.event_id][0], + new_thread=pdu_to_thread[ev.event_id][1], ) except FederationError as e: if e.code == 403: