1
0

Merge branch 'erikj/thread_demo' of github.com:matrix-org/synapse into erikj/add_routing_hooks

This commit is contained in:
Erik Johnston
2018-11-21 11:45:11 +00:00
5 changed files with 70 additions and 27 deletions
+19 -2
View File
@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import random
import six
from six import iteritems
@@ -70,6 +71,7 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler
self.clock = hs.get_clock()
self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")
@@ -207,12 +209,26 @@ class FederationServer(FederationBase):
pdu_results[event_id] = e.error_dict()
return
thread_id = random.randint(1, 999999999)
pdu_to_thread = {}
first_in_thread = True
for pdu in reversed(pdus_by_room[room_id]):
now = self.clock.time_msec()
if now - pdu.origin_server_ts > 1 * 60 * 1000:
pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread)
first_in_thread = False
else:
pdu_to_thread[pdu.event_id] = (0, False)
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
with nested_logging_context(event_id):
thread_id, new_thread = pdu_to_thread[pdu.event_id]
logger.info("Assigning thread %d to %s", thread_id, pdu.event_id)
try:
yield self._handle_received_pdu(
origin, pdu
origin, pdu, thread_id=thread_id,
new_thread=new_thread
)
pdu_results[event_id] = {}
except FederationError as e:
@@ -570,7 +586,7 @@ class FederationServer(FederationBase):
)
@defer.inlineCallbacks
def _handle_received_pdu(self, origin, pdu):
def _handle_received_pdu(self, origin, pdu, thread_id, new_thread):
""" Process a PDU received in a federation /send/ transaction.
If the event is invalid, then this method throws a FederationError.
@@ -613,6 +629,7 @@ class FederationServer(FederationBase):
yield self.handler.on_receive_pdu(
origin, pdu, sent_to_us_directly=True,
thread_id=thread_id, new_thread=new_thread,
)
def __str__(self):
+2 -1
View File
@@ -174,7 +174,8 @@ class TransportLayerClient(object):
path=path,
data=json_data,
json_data_callback=json_data_callback,
long_retries=True,
long_retries=False,
timeout=10000,
backoff_on_404=True, # If we get a 404 the other side has gone
)
+19 -15
View File
@@ -140,6 +140,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def on_receive_pdu(
self, origin, pdu, sent_to_us_directly=False, thread_id=None,
new_thread=False,
):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@@ -235,12 +236,6 @@ class FederationHandler(BaseHandler):
state = None
auth_chain = []
new_thread = False
if thread_id is None:
# FIXME: Pick something better?
thread_id = random.randint(0, 999999999)
new_thread = True
# Get missing pdus if necessary.
if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
@@ -434,13 +429,6 @@ class FederationHandler(BaseHandler):
affected=event_id,
)
now = self.clock.time_msec()
if now - pdu.origin_server_ts > 2 * 60 * 1000:
pass
else:
thread_id = 0
new_thread = False
logger.info("Thread ID %r", thread_id)
yield self._process_received_pdu(
@@ -472,7 +460,7 @@ class FederationHandler(BaseHandler):
create_requester(UserID("server", "server")),
event,
context,
ratelimit=True,
ratelimit=False,
extra_users=[],
do_auth=False,
)
@@ -574,6 +562,21 @@ class FederationHandler(BaseHandler):
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)
pdu_to_thread = {}
if not thread_id:
thread_id = random.randint(1, 999999999)
first_in_thread = True
for pdu in reversed(missing_events):
now = self.clock.time_msec()
if now - pdu.origin_server_ts > 1 * 60 * 1000:
pdu_to_thread[pdu.event_id] = (thread_id, first_in_thread)
first_in_thread = False
else:
pdu_to_thread[pdu.event_id] = (0, False)
else:
for pdu in reversed(missing_events):
pdu_to_thread[pdu.event_id] = (thread_id, False)
for ev in missing_events:
logger.info(
"[%s %s] Handling received prev_event %s",
@@ -585,7 +588,8 @@ class FederationHandler(BaseHandler):
origin,
ev,
sent_to_us_directly=False,
thread_id=thread_id,
thread_id=pdu_to_thread[ev.event_id][0],
new_thread=pdu_to_thread[ev.event_id][1],
)
except FederationError as e:
if e.code == 403:
+10 -9
View File
@@ -196,7 +196,7 @@ class MatrixFederationHttpClient(object):
self.clock = hs.get_clock()
self._store = hs.get_datastore()
self.version_string_bytes = hs.version_string.encode('ascii')
self.default_timeout = 60
self.default_timeout = 30
def schedule(x):
reactor.callLater(_EPSILON, x)
@@ -253,13 +253,13 @@ class MatrixFederationHttpClient(object):
):
raise FederationDeniedError(request.destination)
limiter = yield synapse.util.retryutils.get_retry_limiter(
request.destination,
self.clock,
self._store,
backoff_on_404=backoff_on_404,
ignore_backoff=ignore_backoff,
)
# limiter = yield synapse.util.retryutils.get_retry_limiter(
# request.destination,
# self.clock,
# self._store,
# backoff_on_404=backoff_on_404,
# ignore_backoff=ignore_backoff,
# )
method_bytes = request.method.encode("ascii")
destination_bytes = request.destination.encode("ascii")
@@ -274,7 +274,8 @@ class MatrixFederationHttpClient(object):
b"Host": [destination_bytes],
}
with limiter:
# with limiter:
if True:
# XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place)
if long_retries:
@@ -0,0 +1,20 @@
/* Copyright 2018 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE events ADD COLUMN thread_id BIGINT NOT NULL DEFAULT 0;
CREATE INDEX events_room_idx ON events (room_id, thread_id);
-- CREATE SEQUENCE thread_id_seq;