Remove code that attempted to assign stitched orderings retrospectively
This commit is contained in:
@@ -905,9 +905,6 @@ class FederationEventHandler:
|
||||
with nested_logging_context(ev.event_id):
|
||||
await self._process_pulled_event(origin, ev, backfilled=backfilled)
|
||||
|
||||
if backfilled:
|
||||
await self._assign_stitched_orderings(new_events)
|
||||
|
||||
# Check if we've already tried to process these events at some point in the
|
||||
# past. We aren't concerned with the exponential backoff here, just whether it
|
||||
# has failed to be processed before.
|
||||
@@ -2390,103 +2387,6 @@ class FederationEventHandler:
|
||||
)
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
|
||||
|
||||
async def _assign_stitched_orderings(
|
||||
self,
|
||||
events: List[EventBase],
|
||||
) -> None:
|
||||
"""
|
||||
Update persisted backfilled events with a stitched order
|
||||
"""
|
||||
room_id = events[0].room_id
|
||||
|
||||
# Map from event id to assigned stitched ordering
|
||||
stitched_orderings: Dict[str, int] = {}
|
||||
|
||||
# Take a copy of the events we have to process
|
||||
remaining_batch = list(events)
|
||||
|
||||
# Find all events in the current batch which are in a timeline gap
|
||||
gap_events = await self._store.db_pool.simple_select_many_batch(
|
||||
"event_backward_extremities",
|
||||
"event_id",
|
||||
(ev.event_id for ev in events),
|
||||
["event_id", "before_gap_event_id"],
|
||||
)
|
||||
|
||||
# TODO sort gap_events by DAG;received order
|
||||
for gap_event, before_gap_event_id in gap_events:
|
||||
logger.debug("Processing received gap event %s", gap_event)
|
||||
|
||||
matching_events = [gap_event] # TODO find other events in the same gap
|
||||
|
||||
# Find all predecessors of those events in the batch
|
||||
to_insert = find_predecessors(matching_events, remaining_batch)
|
||||
|
||||
logger.debug("Processing to_insert set %s", to_insert)
|
||||
|
||||
# Find the stitched order of the event before the gap
|
||||
# TODO consider doing this with a join
|
||||
previous_event_stitched_order = (
|
||||
await self._store.db_pool.simple_select_one_onecol(
|
||||
"events",
|
||||
{"event_id": before_gap_event_id},
|
||||
"stitched_ordering",
|
||||
True,
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
"Previous event stitched_ordering = %i", previous_event_stitched_order
|
||||
)
|
||||
|
||||
# if previous_event_stitched_order is None, that means we have a room
|
||||
# where there are existing events or gaps without assigned stitched orders.
|
||||
# Let's give up trying to assign stitched orders here.
|
||||
if previous_event_stitched_order is None:
|
||||
# TODO do something better here
|
||||
logger.warning(
|
||||
"Found gap event %s without assigned stitched order: bailing",
|
||||
gap_event,
|
||||
)
|
||||
return
|
||||
|
||||
still_remaining_batch = []
|
||||
for event, context in remaining_batch:
|
||||
if event.event_id not in to_insert:
|
||||
still_remaining_batch.append((event, context))
|
||||
continue
|
||||
|
||||
# TODO we may need to reorder existing events
|
||||
previous_event_stitched_order += 1
|
||||
stitched_orderings[event.event_id] = previous_event_stitched_order
|
||||
|
||||
remaining_batch = still_remaining_batch
|
||||
logger.debug(
|
||||
"Remaining events: %s", [ev.event_id for ev in remaining_batch]
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
"Remaining events after processing gap matches: %s",
|
||||
[ev.event_id for ev in remaining_batch],
|
||||
)
|
||||
|
||||
current_max_stream_ordering = (
|
||||
await self._store.get_room_max_stitched_ordering(room_id) or 0
|
||||
)
|
||||
|
||||
for event in remaining_batch:
|
||||
current_max_stream_ordering += 2**16
|
||||
stitched_orderings[event.event_id] = current_max_stream_ordering
|
||||
|
||||
logger.info("Updating stitched orderings for received backfilled events: %s", stitched_orderings)
|
||||
|
||||
def update(txn: LoggingTransaction) -> None:
|
||||
for event_id, ordering in stitched_orderings.items():
|
||||
txn.execute("UPDATE events SET stitched_ordering = ? WHERE event_id = ?", [ordering, event_id])
|
||||
|
||||
await self._store.db_pool.runInteraction("update_stitched_orderings", update)
|
||||
|
||||
|
||||
def find_predecessors(
|
||||
event_ids: Iterable[str], batch: List[EventBase]
|
||||
) -> Set[str]:
|
||||
|
||||
Reference in New Issue
Block a user