diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index df03eeba00..957bffc68c 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -905,9 +905,6 @@ class FederationEventHandler: with nested_logging_context(ev.event_id): await self._process_pulled_event(origin, ev, backfilled=backfilled) - if backfilled: - await self._assign_stitched_orderings(new_events) - # Check if we've already tried to process these events at some point in the # past. We aren't concerned with the exponential backoff here, just whether it # has failed to be processed before. @@ -2390,103 +2387,6 @@ class FederationEventHandler: ) raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events") - async def _assign_stitched_orderings( - self, - events: List[EventBase], - ) -> None: - """ - Update persisted backfilled events with a stitched order - """ - room_id = events[0].room_id - - # Map from event id to assigned stitched ordering - stitched_orderings: Dict[str, int] = {} - - # Take a copy of the events we have to process - remaining_batch = list(events) - - # Find all events in the current batch which are in a timeline gap - gap_events = await self._store.db_pool.simple_select_many_batch( - "event_backward_extremities", - "event_id", - (ev.event_id for ev in events), - ["event_id", "before_gap_event_id"], - ) - - # TODO sort gap_events by DAG;received order - for gap_event, before_gap_event_id in gap_events: - logger.debug("Processing received gap event %s", gap_event) - - matching_events = [gap_event] # TODO find other events in the same gap - - # Find all predecessors of those events in the batch - to_insert = find_predecessors(matching_events, remaining_batch) - - logger.debug("Processing to_insert set %s", to_insert) - - # Find the stitched order of the event before the gap - # TODO consider doing this with a join - previous_event_stitched_order = ( - await self._store.db_pool.simple_select_one_onecol( - "events", - {"event_id": before_gap_event_id}, - "stitched_ordering", - True, - ) - ) - - logger.debug( - "Previous event stitched_ordering = %i", previous_event_stitched_order - ) - - # if previous_event_stitched_order is None, that means we have a room - # where there are existing events or gaps without assigned stitched orders. - # Let's give up trying to assign stitched orders here. - if previous_event_stitched_order is None: - # TODO do something better here - logger.warning( - "Found gap event %s without assigned stitched order: bailing", - gap_event, - ) - return - - still_remaining_batch = [] - for event, context in remaining_batch: - if event.event_id not in to_insert: - still_remaining_batch.append((event, context)) - continue - - # TODO we may need to reorder existing events - previous_event_stitched_order += 1 - stitched_orderings[event.event_id] = previous_event_stitched_order - - remaining_batch = still_remaining_batch - logger.debug( - "Remaining events: %s", [ev.event_id for ev in remaining_batch] - ) - - logger.debug( - "Remaining events after processing gap matches: %s", - [ev.event_id for ev in remaining_batch], - ) - - current_max_stream_ordering = ( - await self._store.get_room_max_stitched_ordering(room_id) or 0 - ) - - for event in remaining_batch: - current_max_stream_ordering += 2**16 - stitched_orderings[event.event_id] = current_max_stream_ordering - - logger.info("Updating stitched orderings for received backfilled events: %s", stitched_orderings) - - def update(txn: LoggingTransaction) -> None: - for event_id, ordering in stitched_orderings.items(): - txn.execute("UPDATE events SET stitched_ordering = ? WHERE event_id = ?", [ordering, event_id]) - - await self._store.db_pool.runInteraction("update_stitched_orderings", update) - - def find_predecessors( event_ids: Iterable[str], batch: List[EventBase] ) -> Set[str]: