diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 98722ae543..9e064b2e57 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import random import six from six import iteritems @@ -71,6 +72,7 @@ class FederationServer(FederationBase): self.auth = hs.get_auth() self.handler = hs.get_handlers().federation_handler + self.clock = hs.get_clock() self._server_linearizer = Linearizer("fed_server") self._transaction_linearizer = Linearizer("fed_txn_handler") @@ -208,12 +210,25 @@ class FederationServer(FederationBase): pdu_results[event_id] = e.error_dict() return + thread_id = random.randint(0, 999999999) + pdu_to_thread = {} + first_in_thread = True + for pdu in reversed(pdus_by_room[room_id]): + now = self.clock.time_msec() + if now - pdu.origin_server_ts > 1 * 60 * 1000: + pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread) + first_in_thread = False + else: + pdu_to_thread[pdu.event_id] = (0, False) + for pdu in pdus_by_room[room_id]: event_id = pdu.event_id with nested_logging_context(event_id): + thread_id, new_thread = pdu_to_thread[pdu.event_id] try: yield self._handle_received_pdu( - origin, pdu + origin, pdu, thread_id=thread_id, + new_thread=new_thread ) pdu_results[event_id] = {} except FederationError as e: @@ -571,7 +586,7 @@ class FederationServer(FederationBase): ) @defer.inlineCallbacks - def _handle_received_pdu(self, origin, pdu): + def _handle_received_pdu(self, origin, pdu, thread_id, new_thread): """ Process a PDU received in a federation /send/ transaction. If the event is invalid, then this method throws a FederationError. @@ -638,6 +653,7 @@ class FederationServer(FederationBase): yield self.handler.on_receive_pdu( origin, pdu, sent_to_us_directly=True, + thread_id=thread_id, new_thread=new_thread, ) def __str__(self): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c9fefeb087..4b37093a82 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -138,6 +138,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def on_receive_pdu( self, origin, pdu, sent_to_us_directly=False, thread_id=None, + new_thread=False, ): """ Process a PDU received via a federation /send/ transaction, or via backfill of missing prev_events @@ -461,7 +462,7 @@ class FederationHandler(BaseHandler): create_requester(UserID("server", "server")), event, context, - ratelimit=True, + ratelimit=False, extra_users=[], do_auth=False, ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index b54d4a7b89..d2ca39c71e 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -253,13 +253,13 @@ class MatrixFederationHttpClient(object): ): raise FederationDeniedError(request.destination) - limiter = yield synapse.util.retryutils.get_retry_limiter( - request.destination, - self.clock, - self._store, - backoff_on_404=backoff_on_404, - ignore_backoff=ignore_backoff, - ) + # limiter = yield synapse.util.retryutils.get_retry_limiter( + # request.destination, + # self.clock, + # self._store, + # backoff_on_404=backoff_on_404, + # ignore_backoff=ignore_backoff, + # ) method_bytes = request.method.encode("ascii") destination_bytes = request.destination.encode("ascii") @@ -274,7 +274,8 @@ class MatrixFederationHttpClient(object): b"Host": [destination_bytes], } - with limiter: + # with limiter: + if True: # XXX: Would be much nicer to retry only at the transaction-layer # (once we have reliable transactions in place) if long_retries: