Refactor get_events_from_store_or_dest to return a dict (#6501)
* commit '4c7b1bb6c': Refactor get_events_from_store_or_dest to return a dict (#6501)
This commit is contained in:
@@ -39,7 +39,7 @@ from synapse.events import builder, room_version_to_event_format
|
||||
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
|
||||
from synapse.logging.context import make_deferred_yieldable
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util import batch_iter, unwrapFirstError
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
||||
@@ -327,7 +327,74 @@ class FederationClient(FederationBase):
|
||||
):
|
||||
raise Exception("invalid response from /state_ids")
|
||||
|
||||
return state_event_ids, auth_event_ids
|
||||
desired_events = set(state_event_ids + auth_event_ids)
|
||||
event_map = yield self.get_events_from_store_or_dest(
|
||||
destination, room_id, desired_events
|
||||
)
|
||||
|
||||
failed_to_fetch = desired_events - event_map.keys()
|
||||
if failed_to_fetch:
|
||||
logger.warning(
|
||||
"Failed to fetch missing state/auth events for %s: %s",
|
||||
room_id,
|
||||
failed_to_fetch,
|
||||
)
|
||||
|
||||
pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map]
|
||||
auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map]
|
||||
|
||||
auth_chain.sort(key=lambda e: e.depth)
|
||||
|
||||
return pdus, auth_chain
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_events_from_store_or_dest(self, destination, room_id, event_ids):
|
||||
"""Fetch events from a remote destination, checking if we already have them.
|
||||
|
||||
Args:
|
||||
destination (str)
|
||||
room_id (str)
|
||||
event_ids (Iterable[str])
|
||||
|
||||
Returns:
|
||||
Deferred[dict[str, EventBase]]: A deferred resolving to a map
|
||||
from event_id to event
|
||||
"""
|
||||
fetched_events = yield self.store.get_events(event_ids, allow_rejected=True)
|
||||
|
||||
missing_events = set(event_ids) - fetched_events.keys()
|
||||
|
||||
if not missing_events:
|
||||
return fetched_events
|
||||
|
||||
logger.debug(
|
||||
"Fetching unknown state/auth events %s for room %s",
|
||||
missing_events,
|
||||
event_ids,
|
||||
)
|
||||
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
|
||||
# XXX 20 requests at once? really?
|
||||
for batch in batch_iter(missing_events, 20):
|
||||
deferreds = [
|
||||
run_in_background(
|
||||
self.get_pdu,
|
||||
destinations=[destination],
|
||||
event_id=e_id,
|
||||
room_version=room_version,
|
||||
)
|
||||
for e_id in batch
|
||||
]
|
||||
|
||||
res = yield make_deferred_yieldable(
|
||||
defer.DeferredList(deferreds, consumeErrors=True)
|
||||
)
|
||||
for success, result in res:
|
||||
if success and result:
|
||||
fetched_events[result.event_id] = result
|
||||
|
||||
return fetched_events
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
|
||||
Reference in New Issue
Block a user