From 596dfdb4b40bfa0e98d39ae4c3eaac81f57fbe7d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 20 Aug 2025 16:37:44 +0100 Subject: [PATCH] Update stitched_order of backfilled events after persisting --- synapse/handlers/federation_event.py | 133 +++++++++++++++++- synapse/storage/controllers/persist_events.py | 31 ++-- synapse/storage/databases/main/events.py | 81 +++++------ .../storage/databases/main/events_worker.py | 18 +++ 4 files changed, 201 insertions(+), 62 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 04ee774aa3..df03eeba00 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -86,6 +86,7 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEventsRestServlet, ) from synapse.state import StateResolutionStore +from synapse.storage.database import LoggingTransaction from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.types import ( PersistedEventPosition, @@ -904,8 +905,11 @@ class FederationEventHandler: with nested_logging_context(ev.event_id): await self._process_pulled_event(origin, ev, backfilled=backfilled) + if backfilled: + await self._assign_stitched_orderings(new_events) + # Check if we've already tried to process these events at some point in the - # past. We aren't concerned with the expontntial backoff here, just whether it + # past. We aren't concerned with the exponential backoff here, just whether it # has failed to be processed before. event_ids_with_failed_pull_attempts = ( await self._store.get_event_ids_with_failed_pull_attempts( @@ -2385,3 +2389,130 @@ class FederationEventHandler: len(ev.auth_event_ids()), ) raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events") + + async def _assign_stitched_orderings( + self, + events: List[EventBase], + ) -> None: + """ + Update persisted backfilled events with a stitched order + """ + room_id = events[0].room_id + + # Map from event id to assigned stitched ordering + stitched_orderings: Dict[str, int] = {} + + # Take a copy of the events we have to process + remaining_batch = list(events) + + # Find all events in the current batch which are in a timeline gap + gap_events = await self._store.db_pool.simple_select_many_batch( + "event_backward_extremities", + "event_id", + (ev.event_id for ev in events), + ["event_id", "before_gap_event_id"], + ) + + # TODO sort gap_events by DAG;received order + for gap_event, before_gap_event_id in gap_events: + logger.debug("Processing received gap event %s", gap_event) + + matching_events = [gap_event] # TODO find other events in the same gap + + # Find all predecessors of those events in the batch + to_insert = find_predecessors(matching_events, remaining_batch) + + logger.debug("Processing to_insert set %s", to_insert) + + # Find the stitched order of the event before the gap + # TODO consider doing this with a join + previous_event_stitched_order = ( + await self._store.db_pool.simple_select_one_onecol( + "events", + {"event_id": before_gap_event_id}, + "stitched_ordering", + True, + ) + ) + + logger.debug( + "Previous event stitched_ordering = %i", previous_event_stitched_order + ) + + # if previous_event_stitched_order is None, that means we have a room + # where there are existing events or gaps without assigned stitched orders. + # Let's give up trying to assign stitched orders here. + if previous_event_stitched_order is None: + # TODO do something better here + logger.warning( + "Found gap event %s without assigned stitched order: bailing", + gap_event, + ) + return + + still_remaining_batch = [] + for event, context in remaining_batch: + if event.event_id not in to_insert: + still_remaining_batch.append((event, context)) + continue + + # TODO we may need to reorder existing events + previous_event_stitched_order += 1 + stitched_orderings[event.event_id] = previous_event_stitched_order + + remaining_batch = still_remaining_batch + logger.debug( + "Remaining events: %s", [ev.event_id for ev in remaining_batch] + ) + + logger.debug( + "Remaining events after processing gap matches: %s", + [ev.event_id for ev in remaining_batch], + ) + + current_max_stream_ordering = ( + await self._store.get_room_max_stitched_ordering(room_id) or 0 + ) + + for event in remaining_batch: + current_max_stream_ordering += 2**16 + stitched_orderings[event.event_id] = current_max_stream_ordering + + logger.info("Updating stitched orderings for received backfilled events: %s", stitched_orderings) + + def update(txn: LoggingTransaction) -> None: + for event_id, ordering in stitched_orderings.items(): + txn.execute("UPDATE events SET stitched_ordering = ? WHERE event_id = ?", [ordering, event_id]) + + await self._store.db_pool.runInteraction("update_stitched_orderings", update) + + +def find_predecessors( + event_ids: Iterable[str], batch: List[EventBase] +) -> Set[str]: + """ + Walk the tree of dependencies (in batch), and return every event that is + in batch, and is an ancestor of one of the supplied events. + """ + found = set() + unexplored = set(event_ids) + while len(unexplored) > 0: + next_unexplored: Set[str] = set() + + # Iterate through the incoming events, looking for events in our "unexplored" + # set. For each matching event, add it to the "found" set, and add its + # "prev_events" to the "unexplored" set for the next pass. + for event in batch: + if event.event_id in unexplored: + found.add(event.event_id) + next_unexplored.update( + ( + event_id + for event_id in event.prev_event_ids() + if event_id not in found + ) + ) + + unexplored = next_unexplored + + return found diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 0d57b1e9ef..3fbf789a60 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -616,7 +616,8 @@ class EventsPersistenceStorageController: if not events_and_contexts: return replaced_events - await self._assign_stitched_orders(room_id, events_and_contexts) + if not backfilled: + await self._assign_stitched_orders(room_id, events_and_contexts) chunks = [ events_and_contexts[x : x + 100] @@ -698,9 +699,9 @@ class EventsPersistenceStorageController: ["event_id", "before_gap_event_id"], ) + # TODO matching against gaps is pointless here # TODO sort gap_events by DAG;received order for gap_event, before_gap_event_id in gap_events: - logger.debug("Processing received gap event %s", gap_event) matching_events = [gap_event] # TODO find other events in the same gap @@ -721,15 +722,19 @@ class EventsPersistenceStorageController: ) ) - logger.debug("Previous event stitched_ordering = %i", - previous_event_stitched_order) + logger.debug( + "Previous event stitched_ordering = %i", previous_event_stitched_order + ) # if previous_event_stitched_order is None, that means we have a room # where there are existing events or gaps without assigned stitched orders. # Let's give up trying to assign stitched orders here. if previous_event_stitched_order is None: # TODO do something better here - logger.warning("Found gap event %s without assigned stitched order: bailing", gap_event) + logger.warning( + "Found gap event %s without assigned stitched order: bailing", + gap_event, + ) return still_remaining_batch = [] @@ -743,19 +748,21 @@ class EventsPersistenceStorageController: context.stitched_ordering = previous_event_stitched_order logger.debug( "Persisting inserted events with stitched_order=%i", - previous_event_stitched_order + previous_event_stitched_order, ) remaining_batch = still_remaining_batch - logger.debug("Remaining events: %s", [ev.event_id for (ev, _) in - remaining_batch]) + logger.debug( + "Remaining events: %s", [ev.event_id for (ev, _) in remaining_batch] + ) - logger.debug("Remaining events after processing gap matches: %s", [ev.event_id - for (ev, _) in - remaining_batch]) + logger.debug( + "Remaining events after processing gap matches: %s", + [ev.event_id for (ev, _) in remaining_batch], + ) current_max_stream_ordering = ( - await self.persist_events_store.get_room_max_stitched_ordering(room_id) or 0 + await self.main_store.get_room_max_stitched_ordering(room_id) or 0 ) for _event, context in remaining_batch: diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 401939f15f..2f38859ea2 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -3558,7 +3558,7 @@ class PersistEventsStore: room_id = events_and_contexts[0][0].room_id # Map from missing event ID, to the lowest stitched order of the events that reference it. - potential_backwards_extremities: Dict[str, int] = {} + potential_backwards_extremities: Dict[str, Optional[int]] = {} for ev, ctx in events_and_contexts: if ev.internal_metadata.is_outlier(): continue @@ -3569,10 +3569,11 @@ class PersistEventsStore: ) persisted_event_stitched_ordering = ctx.stitched_ordering - # EventContext.stitched_ordering is Optional, because it is assigned - # quite late in the persistence process, but it should have been - # assigned by now. - assert persisted_event_stitched_ordering is not None + # If any of the events we persisted did not get assigned a stitched order, + # we cannot yet assign a stitched order to the backwards extremity either. + if persisted_event_stitched_ordering is None: + potential_backwards_extremities[prev_event] = None + continue if lowest_referring_ordering is None: lowest_referring_ordering = persisted_event_stitched_ordering @@ -3671,7 +3672,7 @@ class PersistEventsStore: txn: LoggingTransaction, room_id: str, backward_extremity_event_id: str, - lowest_referring_ordering: int, + lowest_referring_ordering: Optional[int], ) -> Optional[str]: """ Figure out where in the stitched order a gap (or backwards extremity) belongs. @@ -3693,29 +3694,27 @@ class PersistEventsStore: lowest_referring_ordering: The lowest stitched ordering of all the events that we have just inserted, that refer to this backwards extremity. """ - # Given the lowest stitched ordering of all the events that we have just - # inserted, find the previous event (by stitched ordering); the gap - # will likely come just afterwards. - # - # Note: we include "AND event_id <> backward_extremity_event_id" because - # if this backward extremity is actually an outlier, then that event - # does exist in events, but we don't want to find it. - txn.execute( - """ - SELECT event_id, stitched_ordering FROM events - WHERE room_id = ? AND stitched_ordering < ? AND event_id <> ? - ORDER BY stitched_ordering DESC LIMIT 1 - """, - [room_id, lowest_referring_ordering, backward_extremity_event_id], - ) - row = txn.fetchone() - if row is None: - # There is no event in the table before this gap. This shouldn't happen - # (assuming the create event was given a stitched ordering), because the - # create event should always be before any gaps. - (new_before_gap_event_id, new_previous_stitched_ordering) = (None, None) - else: - (new_before_gap_event_id, new_previous_stitched_ordering) = row + (new_before_gap_event_id, new_previous_stitched_ordering) = (None, None) + + if lowest_referring_ordering is not None: + # Given the lowest stitched ordering of all the events that we have just + # inserted, find the previous event (by stitched ordering); the gap + # will likely come just afterwards. + # + # Note: we include "AND event_id <> backward_extremity_event_id" because + # if this backward extremity is actually an outlier, then that event + # does exist in events, but we don't want to find it. + txn.execute( + """ + SELECT event_id, stitched_ordering FROM events + WHERE room_id = ? AND stitched_ordering < ? AND event_id <> ? + ORDER BY stitched_ordering DESC LIMIT 1 + """, + [room_id, lowest_referring_ordering, backward_extremity_event_id], + ) + row = txn.fetchone() + if row is not None: + (new_before_gap_event_id, new_previous_stitched_ordering) = row # If this is an existing backwards extremity, see where it currently # exists in the order. @@ -3744,31 +3743,15 @@ class PersistEventsStore: # This is an existing backwards extremity with an assigned stitched ordering. # Leave it as-is unless we have successfully calculated a new stitched ordering # which is lower than the existing. - if new_previous_stitched_ordering is not None and new_previous_stitched_ordering < existing_previous_stitched_ordering: + if ( + new_previous_stitched_ordering is not None + and new_previous_stitched_ordering < existing_previous_stitched_ordering + ): return new_before_gap_event_id else: return existing_previous_stitched_ordering - async def get_room_max_stitched_ordering(self, room_id: str) -> Optional[int]: - """Get the maximum stitched order for any event currently in the room. - - If no events in this room have an assigned stitched order, returns None. - """ - - def get_room_max_stitched_ordering_txn( - txn: LoggingTransaction, - ) -> Optional[int]: - sql = "SELECT MAX(stitched_ordering) FROM events WHERE room_id=?" - txn.execute(sql, [room_id]) - ret = [r[0] for r in txn] - return ret[0] - - return await self.db_pool.runInteraction( - "get_room_max_stitched_ordering", get_room_max_stitched_ordering_txn - ) - - @attr.s(slots=True, auto_attribs=True) class _LinkMap: """A helper type for tracking links between chains.""" diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index cc031d8996..d6c3776c9f 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -2755,3 +2755,21 @@ class EventsWorkerStore(SQLBaseStore): sender=row[0], received_ts=row[1], ) + + async def get_room_max_stitched_ordering(self, room_id: str) -> Optional[int]: + """Get the maximum stitched order for any event currently in the room. + + If no events in this room have an assigned stitched order, returns None. + """ + + def get_room_max_stitched_ordering_txn( + txn: LoggingTransaction, + ) -> Optional[int]: + sql = "SELECT MAX(stitched_ordering) FROM events WHERE room_id=?" + txn.execute(sql, [room_id]) + ret = [r[0] for r in txn] + return ret[0] + + return await self.db_pool.runInteraction( + "get_room_max_stitched_ordering", get_room_max_stitched_ordering_txn + )