diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 66d3da8719..ab2ed53bce 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -648,6 +648,98 @@ class FederationEventHandler: logger.info("Got %d prev_events", len(missing_events)) await self._process_pulled_events(origin, missing_events, backfilled=False) + async def generateEventIdGraphFromEvents( + self, events: Iterable[EventBase] + ) -> Dict[str, Iterable[str]]: + event_map = {event.event_id: event for event in events} + + # Since the insertion event we try to reference later on might be in the + # backfill chunk itself, we need to make it easy to lookup. Maps a given + # batch_id to the insertion event. + batch_id_map = { + event.content.get( + EventContentFields.MSC2716_NEXT_BATCH_ID, None + ): event.event_id + for event in events + if event.type == EventTypes.MSC2716_INSERTION + } + + # Map a given event to it's successors (backwards prev_events) + successor_event_id_map = {} + for event in events: + for prev_event_id in event.prev_event_ids(): + successor_event_id_map.setdefault(prev_event_id, []).append( + event.event_id + ) + + event_id_graph = {} + for event in events: + # Assign the real edges to the graph. + # Make a copy so we don't modify the actual prev_events when we extend them below. + event_id_graph.setdefault(event.event_id, []).extend( + event.prev_event_ids().copy() + ) + + # We need to make some fake edge connections from the batch event at + # the bottom of the historical batch to the insertion event. This + # way the historical batch topologically sorts in ahead-in-time of + # the event we branched off of. + batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID, None) + if event.type == EventTypes.MSC2716_BATCH and batch_id: + # Maybe we can get lucky and save ourselves a lookup + # by checking the events in the backfill first + insertion_event_id = batch_id_map[ + batch_id + ] or await self._store.get_insertion_event_id_by_batch_id( + event.room_id, batch_id + ) + + if insertion_event_id: + # Connect the insertion event via a fake edge pointing to the + # batch event so the historical batch topologically sorts + # behind-in-time the insertion event. + event_id_graph.setdefault(insertion_event_id, []).append( + event.event_id + ) + + # Maybe we can get lucky and save ourselves a lookup + # by checking the events in the backfill first + insertion_event = event_map[ + insertion_event_id + ] or await self._store.get_event( + insertion_event_id, allow_none=True + ) + + if insertion_event: + # Connect the insertion events' `prev_event` successors + # via fake edges pointing to the insertion event itself + # so the insertion event sorts topologically + # behind-in-time the successor. Nestled perfectly + # between the prev_event and the successor. + for insertion_prev_event_id in insertion_event.prev_event_ids(): + successor_event_ids = successor_event_id_map[ + insertion_prev_event_id + ] + logger.info( + "insertion_event_id=%s successor_event_ids=%s", + insertion_event_id, + successor_event_ids, + ) + if successor_event_ids: + for successor_event_id in successor_event_ids: + # Don't add itself back as a successor + if successor_event_id != insertion_event_id: + # Fake edge to point the successor back + # at the insertion event + event_id_graph.setdefault( + successor_event_id, [] + ).append(insertion_event_id) + + # TODO: We also need to add fake edges to connect the oldest-in-time messages + # in the batch to the event we branched off of, see https://github.com/matrix-org/synapse/pull/11114#discussion_r739300985 + + return event_id_graph + async def _process_pulled_events( self, origin: str, events: Iterable[EventBase], backfilled: bool ) -> None: @@ -683,95 +775,12 @@ class FederationEventHandler: # tell clients about them in order. # sorted_events = sorted(events, key=lambda x: x.depth) - event_ids = [event.event_id for event in events] - event_map = {event.event_id: event for event in events} - - # Since the insertion event we try to reference later on might be in the - # backfill chunk itself, we need to make it easy to lookup. Maps a given - # batch_id to the insertion event. - batch_id_map = { - event.content.get( - EventContentFields.MSC2716_NEXT_BATCH_ID, None - ): event.event_id - for event in events - if event.type == EventTypes.MSC2716_INSERTION - } - - successor_event_id_map = {} - for event in events: - for prev_event_id in event.prev_event_ids(): - successor_event_id_map.setdefault(prev_event_id, []).append( - event.event_id - ) - - event_id_graph = {} - for event in events: - # Assign the real edges to the graph. - # Make a copy so we don't modify the actual prev_events when we extend them below. - event_id_graph.setdefault(event.event_id, []).extend( - event.prev_event_ids().copy() - ) - - # We need to make some fake edge connections from the batch event at - # the bottom of the historical batch to the insertion event. This - # way the historical batch topologically sorts in ahead-in-time of - # the event we branched off of. - batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID, None) - if event.type == EventTypes.MSC2716_BATCH and batch_id: - # Maybe we can get lucky and save ourselves a lookup - # by checking the events in the backfill first - insertion_event_id = batch_id_map[ - batch_id - ] or await self._store.get_insertion_event_id_by_batch_id( - event.room_id, batch_id - ) - - if insertion_event_id: - # Add the insertion event as a fake edge connection to the batch - # event so the historical batch topologically sorts below - # the "live" event we branched off of. - event_id_graph.setdefault(event.event_id, []).append( - insertion_event_id - ) - - # Maybe we can get lucky and save ourselves a lookup - # by checking the events in the backfill first - insertion_event = event_map[ - insertion_event_id - ] or await self._store.get_event( - insertion_event_id, allow_none=True - ) - - if insertion_event: - # Also add some fake edges to connect the insertion - # event to it's prev_event successors so it sorts - # topologically behind-in-time the successor. Nestled - # perfectly between the prev_event and the successor. - for insertion_prev_event_id in insertion_event.prev_event_ids(): - successor_event_ids = successor_event_id_map[ - insertion_prev_event_id - ] - logger.info( - "insertion_event_id=%s successor_event_ids=%s", - insertion_event_id, - successor_event_ids, - ) - if successor_event_ids: - - event_id_graph.setdefault( - insertion_event_id, [] - ).extend( - [ - successor_event_id - for successor_event_id in successor_event_ids - # Don't add itself back as a successor - if successor_event_id != insertion_event_id - ] - ) - # We want to sort topologically so we process them and tell clients # about them in order. sorted_events = [] + event_ids = [event.event_id for event in events] + event_map = {event.event_id: event for event in events} + event_id_graph = await self.generateEventIdGraphFromEvents(events) for event_id in sorted_topologically(event_ids, event_id_graph): sorted_events.append(event_map[event_id]) sorted_events = reversed(sorted_events)