Persist auth/state events at backwards extremities when we fetch them (#6526)
* commit 'ff773ff72': Persist auth/state events at backwards extremities when we fetch them (#6526)
This commit is contained in:
@@ -65,7 +65,6 @@ from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRes
|
||||
from synapse.state import StateResolutionStore, resolve_events_with_store
|
||||
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.util import batch_iter, unwrapFirstError
|
||||
from synapse.util.async_helpers import Linearizer, concurrently_execute
|
||||
from synapse.util.distributor import user_joined_room
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
@@ -238,7 +237,6 @@ class FederationHandler(BaseHandler):
|
||||
return None
|
||||
|
||||
state = None
|
||||
auth_chain = []
|
||||
|
||||
# Get missing pdus if necessary.
|
||||
if not pdu.internal_metadata.is_outlier():
|
||||
@@ -342,7 +340,6 @@ class FederationHandler(BaseHandler):
|
||||
|
||||
# Calculate the state after each of the previous events, and
|
||||
# resolve them to find the correct state at the current event.
|
||||
auth_chains = set()
|
||||
event_map = {event_id: pdu}
|
||||
try:
|
||||
# Get the state of the events we know about
|
||||
@@ -366,24 +363,14 @@ class FederationHandler(BaseHandler):
|
||||
p,
|
||||
)
|
||||
|
||||
room_version = await self.store.get_room_version(room_id)
|
||||
|
||||
with nested_logging_context(p):
|
||||
# note that if any of the missing prevs share missing state or
|
||||
# auth events, the requests to fetch those events are deduped
|
||||
# by the get_pdu_cache in federation_client.
|
||||
(
|
||||
remote_state,
|
||||
got_auth_chain,
|
||||
) = await self._get_state_for_room(
|
||||
(remote_state, _,) = await self._get_state_for_room(
|
||||
origin, room_id, p, include_event_in_state=True
|
||||
)
|
||||
|
||||
# XXX hrm I'm not convinced that duplicate events will compare
|
||||
# for equality, so I'm not sure this does what the author
|
||||
# hoped.
|
||||
auth_chains.update(got_auth_chain)
|
||||
|
||||
remote_state_map = {
|
||||
(x.type, x.state_key): x.event_id for x in remote_state
|
||||
}
|
||||
@@ -392,6 +379,7 @@ class FederationHandler(BaseHandler):
|
||||
for x in remote_state:
|
||||
event_map[x.event_id] = x
|
||||
|
||||
room_version = await self.store.get_room_version(room_id)
|
||||
state_map = await resolve_events_with_store(
|
||||
room_id,
|
||||
room_version,
|
||||
@@ -413,7 +401,6 @@ class FederationHandler(BaseHandler):
|
||||
event_map.update(evs)
|
||||
|
||||
state = [event_map[e] for e in six.itervalues(state_map)]
|
||||
auth_chain = list(auth_chains)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"[%s %s] Error attempting to resolve state at missing "
|
||||
@@ -429,9 +416,7 @@ class FederationHandler(BaseHandler):
|
||||
affected=event_id,
|
||||
)
|
||||
|
||||
await self._process_received_pdu(
|
||||
origin, pdu, state=state, auth_chain=auth_chain
|
||||
)
|
||||
await self._process_received_pdu(origin, pdu, state=state)
|
||||
|
||||
async def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
|
||||
"""
|
||||
@@ -633,6 +618,8 @@ class FederationHandler(BaseHandler):
|
||||
room_id (str)
|
||||
event_ids (Iterable[str])
|
||||
|
||||
Persists any events we don't already have as outliers.
|
||||
|
||||
If we fail to fetch any of the events, a warning will be logged, and the event
|
||||
will be omitted from the result. Likewise, any events which turn out not to
|
||||
be in the given room.
|
||||
@@ -652,27 +639,15 @@ class FederationHandler(BaseHandler):
|
||||
room_id,
|
||||
)
|
||||
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
yield self._get_events_and_persist(
|
||||
destination=destination, room_id=room_id, events=missing_events
|
||||
)
|
||||
|
||||
# XXX 20 requests at once? really?
|
||||
for batch in batch_iter(missing_events, 20):
|
||||
deferreds = [
|
||||
run_in_background(
|
||||
self.federation_client.get_pdu,
|
||||
destinations=[destination],
|
||||
event_id=e_id,
|
||||
room_version=room_version,
|
||||
)
|
||||
for e_id in batch
|
||||
]
|
||||
|
||||
res = yield make_deferred_yieldable(
|
||||
defer.DeferredList(deferreds, consumeErrors=True)
|
||||
)
|
||||
|
||||
for success, result in res:
|
||||
if success and result:
|
||||
fetched_events[result.event_id] = result
|
||||
# we need to make sure we re-load from the database to get the rejected
|
||||
# state correct.
|
||||
fetched_events.update(
|
||||
(yield self.store.get_events(missing_events, allow_rejected=True))
|
||||
)
|
||||
|
||||
# check for events which were in the wrong room.
|
||||
#
|
||||
@@ -702,7 +677,7 @@ class FederationHandler(BaseHandler):
|
||||
return fetched_events
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _process_received_pdu(self, origin, event, state, auth_chain):
|
||||
def _process_received_pdu(self, origin, event, state):
|
||||
""" Called when we have a new pdu. We need to do auth checks and put it
|
||||
through the StateHandler.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user