Update stitched_order of backfilled events after persisting
This commit is contained in:
@@ -86,6 +86,7 @@ from synapse.replication.http.federation import (
|
||||
ReplicationFederationSendEventsRestServlet,
|
||||
)
|
||||
from synapse.state import StateResolutionStore
|
||||
from synapse.storage.database import LoggingTransaction
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.types import (
|
||||
PersistedEventPosition,
|
||||
@@ -904,8 +905,11 @@ class FederationEventHandler:
|
||||
with nested_logging_context(ev.event_id):
|
||||
await self._process_pulled_event(origin, ev, backfilled=backfilled)
|
||||
|
||||
if backfilled:
|
||||
await self._assign_stitched_orderings(new_events)
|
||||
|
||||
# Check if we've already tried to process these events at some point in the
|
||||
# past. We aren't concerned with the expontntial backoff here, just whether it
|
||||
# past. We aren't concerned with the exponential backoff here, just whether it
|
||||
# has failed to be processed before.
|
||||
event_ids_with_failed_pull_attempts = (
|
||||
await self._store.get_event_ids_with_failed_pull_attempts(
|
||||
@@ -2385,3 +2389,130 @@ class FederationEventHandler:
|
||||
len(ev.auth_event_ids()),
|
||||
)
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
|
||||
|
||||
async def _assign_stitched_orderings(
|
||||
self,
|
||||
events: List[EventBase],
|
||||
) -> None:
|
||||
"""
|
||||
Update persisted backfilled events with a stitched order
|
||||
"""
|
||||
room_id = events[0].room_id
|
||||
|
||||
# Map from event id to assigned stitched ordering
|
||||
stitched_orderings: Dict[str, int] = {}
|
||||
|
||||
# Take a copy of the events we have to process
|
||||
remaining_batch = list(events)
|
||||
|
||||
# Find all events in the current batch which are in a timeline gap
|
||||
gap_events = await self._store.db_pool.simple_select_many_batch(
|
||||
"event_backward_extremities",
|
||||
"event_id",
|
||||
(ev.event_id for ev in events),
|
||||
["event_id", "before_gap_event_id"],
|
||||
)
|
||||
|
||||
# TODO sort gap_events by DAG;received order
|
||||
for gap_event, before_gap_event_id in gap_events:
|
||||
logger.debug("Processing received gap event %s", gap_event)
|
||||
|
||||
matching_events = [gap_event] # TODO find other events in the same gap
|
||||
|
||||
# Find all predecessors of those events in the batch
|
||||
to_insert = find_predecessors(matching_events, remaining_batch)
|
||||
|
||||
logger.debug("Processing to_insert set %s", to_insert)
|
||||
|
||||
# Find the stitched order of the event before the gap
|
||||
# TODO consider doing this with a join
|
||||
previous_event_stitched_order = (
|
||||
await self._store.db_pool.simple_select_one_onecol(
|
||||
"events",
|
||||
{"event_id": before_gap_event_id},
|
||||
"stitched_ordering",
|
||||
True,
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
"Previous event stitched_ordering = %i", previous_event_stitched_order
|
||||
)
|
||||
|
||||
# if previous_event_stitched_order is None, that means we have a room
|
||||
# where there are existing events or gaps without assigned stitched orders.
|
||||
# Let's give up trying to assign stitched orders here.
|
||||
if previous_event_stitched_order is None:
|
||||
# TODO do something better here
|
||||
logger.warning(
|
||||
"Found gap event %s without assigned stitched order: bailing",
|
||||
gap_event,
|
||||
)
|
||||
return
|
||||
|
||||
still_remaining_batch = []
|
||||
for event, context in remaining_batch:
|
||||
if event.event_id not in to_insert:
|
||||
still_remaining_batch.append((event, context))
|
||||
continue
|
||||
|
||||
# TODO we may need to reorder existing events
|
||||
previous_event_stitched_order += 1
|
||||
stitched_orderings[event.event_id] = previous_event_stitched_order
|
||||
|
||||
remaining_batch = still_remaining_batch
|
||||
logger.debug(
|
||||
"Remaining events: %s", [ev.event_id for ev in remaining_batch]
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
"Remaining events after processing gap matches: %s",
|
||||
[ev.event_id for ev in remaining_batch],
|
||||
)
|
||||
|
||||
current_max_stream_ordering = (
|
||||
await self._store.get_room_max_stitched_ordering(room_id) or 0
|
||||
)
|
||||
|
||||
for event in remaining_batch:
|
||||
current_max_stream_ordering += 2**16
|
||||
stitched_orderings[event.event_id] = current_max_stream_ordering
|
||||
|
||||
logger.info("Updating stitched orderings for received backfilled events: %s", stitched_orderings)
|
||||
|
||||
def update(txn: LoggingTransaction) -> None:
|
||||
for event_id, ordering in stitched_orderings.items():
|
||||
txn.execute("UPDATE events SET stitched_ordering = ? WHERE event_id = ?", [ordering, event_id])
|
||||
|
||||
await self._store.db_pool.runInteraction("update_stitched_orderings", update)
|
||||
|
||||
|
||||
def find_predecessors(
|
||||
event_ids: Iterable[str], batch: List[EventBase]
|
||||
) -> Set[str]:
|
||||
"""
|
||||
Walk the tree of dependencies (in batch), and return every event that is
|
||||
in batch, and is an ancestor of one of the supplied events.
|
||||
"""
|
||||
found = set()
|
||||
unexplored = set(event_ids)
|
||||
while len(unexplored) > 0:
|
||||
next_unexplored: Set[str] = set()
|
||||
|
||||
# Iterate through the incoming events, looking for events in our "unexplored"
|
||||
# set. For each matching event, add it to the "found" set, and add its
|
||||
# "prev_events" to the "unexplored" set for the next pass.
|
||||
for event in batch:
|
||||
if event.event_id in unexplored:
|
||||
found.add(event.event_id)
|
||||
next_unexplored.update(
|
||||
(
|
||||
event_id
|
||||
for event_id in event.prev_event_ids()
|
||||
if event_id not in found
|
||||
)
|
||||
)
|
||||
|
||||
unexplored = next_unexplored
|
||||
|
||||
return found
|
||||
|
||||
@@ -616,7 +616,8 @@ class EventsPersistenceStorageController:
|
||||
if not events_and_contexts:
|
||||
return replaced_events
|
||||
|
||||
await self._assign_stitched_orders(room_id, events_and_contexts)
|
||||
if not backfilled:
|
||||
await self._assign_stitched_orders(room_id, events_and_contexts)
|
||||
|
||||
chunks = [
|
||||
events_and_contexts[x : x + 100]
|
||||
@@ -698,9 +699,9 @@ class EventsPersistenceStorageController:
|
||||
["event_id", "before_gap_event_id"],
|
||||
)
|
||||
|
||||
# TODO matching against gaps is pointless here
|
||||
# TODO sort gap_events by DAG;received order
|
||||
for gap_event, before_gap_event_id in gap_events:
|
||||
|
||||
logger.debug("Processing received gap event %s", gap_event)
|
||||
|
||||
matching_events = [gap_event] # TODO find other events in the same gap
|
||||
@@ -721,15 +722,19 @@ class EventsPersistenceStorageController:
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug("Previous event stitched_ordering = %i",
|
||||
previous_event_stitched_order)
|
||||
logger.debug(
|
||||
"Previous event stitched_ordering = %i", previous_event_stitched_order
|
||||
)
|
||||
|
||||
# if previous_event_stitched_order is None, that means we have a room
|
||||
# where there are existing events or gaps without assigned stitched orders.
|
||||
# Let's give up trying to assign stitched orders here.
|
||||
if previous_event_stitched_order is None:
|
||||
# TODO do something better here
|
||||
logger.warning("Found gap event %s without assigned stitched order: bailing", gap_event)
|
||||
logger.warning(
|
||||
"Found gap event %s without assigned stitched order: bailing",
|
||||
gap_event,
|
||||
)
|
||||
return
|
||||
|
||||
still_remaining_batch = []
|
||||
@@ -743,19 +748,21 @@ class EventsPersistenceStorageController:
|
||||
context.stitched_ordering = previous_event_stitched_order
|
||||
logger.debug(
|
||||
"Persisting inserted events with stitched_order=%i",
|
||||
previous_event_stitched_order
|
||||
previous_event_stitched_order,
|
||||
)
|
||||
|
||||
remaining_batch = still_remaining_batch
|
||||
logger.debug("Remaining events: %s", [ev.event_id for (ev, _) in
|
||||
remaining_batch])
|
||||
logger.debug(
|
||||
"Remaining events: %s", [ev.event_id for (ev, _) in remaining_batch]
|
||||
)
|
||||
|
||||
logger.debug("Remaining events after processing gap matches: %s", [ev.event_id
|
||||
for (ev, _) in
|
||||
remaining_batch])
|
||||
logger.debug(
|
||||
"Remaining events after processing gap matches: %s",
|
||||
[ev.event_id for (ev, _) in remaining_batch],
|
||||
)
|
||||
|
||||
current_max_stream_ordering = (
|
||||
await self.persist_events_store.get_room_max_stitched_ordering(room_id) or 0
|
||||
await self.main_store.get_room_max_stitched_ordering(room_id) or 0
|
||||
)
|
||||
|
||||
for _event, context in remaining_batch:
|
||||
|
||||
@@ -3558,7 +3558,7 @@ class PersistEventsStore:
|
||||
room_id = events_and_contexts[0][0].room_id
|
||||
|
||||
# Map from missing event ID, to the lowest stitched order of the events that reference it.
|
||||
potential_backwards_extremities: Dict[str, int] = {}
|
||||
potential_backwards_extremities: Dict[str, Optional[int]] = {}
|
||||
for ev, ctx in events_and_contexts:
|
||||
if ev.internal_metadata.is_outlier():
|
||||
continue
|
||||
@@ -3569,10 +3569,11 @@ class PersistEventsStore:
|
||||
)
|
||||
persisted_event_stitched_ordering = ctx.stitched_ordering
|
||||
|
||||
# EventContext.stitched_ordering is Optional, because it is assigned
|
||||
# quite late in the persistence process, but it should have been
|
||||
# assigned by now.
|
||||
assert persisted_event_stitched_ordering is not None
|
||||
# If any of the events we persisted did not get assigned a stitched order,
|
||||
# we cannot yet assign a stitched order to the backwards extremity either.
|
||||
if persisted_event_stitched_ordering is None:
|
||||
potential_backwards_extremities[prev_event] = None
|
||||
continue
|
||||
|
||||
if lowest_referring_ordering is None:
|
||||
lowest_referring_ordering = persisted_event_stitched_ordering
|
||||
@@ -3671,7 +3672,7 @@ class PersistEventsStore:
|
||||
txn: LoggingTransaction,
|
||||
room_id: str,
|
||||
backward_extremity_event_id: str,
|
||||
lowest_referring_ordering: int,
|
||||
lowest_referring_ordering: Optional[int],
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Figure out where in the stitched order a gap (or backwards extremity) belongs.
|
||||
@@ -3693,29 +3694,27 @@ class PersistEventsStore:
|
||||
lowest_referring_ordering: The lowest stitched ordering of all the events
|
||||
that we have just inserted, that refer to this backwards extremity.
|
||||
"""
|
||||
# Given the lowest stitched ordering of all the events that we have just
|
||||
# inserted, find the previous event (by stitched ordering); the gap
|
||||
# will likely come just afterwards.
|
||||
#
|
||||
# Note: we include "AND event_id <> backward_extremity_event_id" because
|
||||
# if this backward extremity is actually an outlier, then that event
|
||||
# does exist in events, but we don't want to find it.
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT event_id, stitched_ordering FROM events
|
||||
WHERE room_id = ? AND stitched_ordering < ? AND event_id <> ?
|
||||
ORDER BY stitched_ordering DESC LIMIT 1
|
||||
""",
|
||||
[room_id, lowest_referring_ordering, backward_extremity_event_id],
|
||||
)
|
||||
row = txn.fetchone()
|
||||
if row is None:
|
||||
# There is no event in the table before this gap. This shouldn't happen
|
||||
# (assuming the create event was given a stitched ordering), because the
|
||||
# create event should always be before any gaps.
|
||||
(new_before_gap_event_id, new_previous_stitched_ordering) = (None, None)
|
||||
else:
|
||||
(new_before_gap_event_id, new_previous_stitched_ordering) = row
|
||||
(new_before_gap_event_id, new_previous_stitched_ordering) = (None, None)
|
||||
|
||||
if lowest_referring_ordering is not None:
|
||||
# Given the lowest stitched ordering of all the events that we have just
|
||||
# inserted, find the previous event (by stitched ordering); the gap
|
||||
# will likely come just afterwards.
|
||||
#
|
||||
# Note: we include "AND event_id <> backward_extremity_event_id" because
|
||||
# if this backward extremity is actually an outlier, then that event
|
||||
# does exist in events, but we don't want to find it.
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT event_id, stitched_ordering FROM events
|
||||
WHERE room_id = ? AND stitched_ordering < ? AND event_id <> ?
|
||||
ORDER BY stitched_ordering DESC LIMIT 1
|
||||
""",
|
||||
[room_id, lowest_referring_ordering, backward_extremity_event_id],
|
||||
)
|
||||
row = txn.fetchone()
|
||||
if row is not None:
|
||||
(new_before_gap_event_id, new_previous_stitched_ordering) = row
|
||||
|
||||
# If this is an existing backwards extremity, see where it currently
|
||||
# exists in the order.
|
||||
@@ -3744,31 +3743,15 @@ class PersistEventsStore:
|
||||
# This is an existing backwards extremity with an assigned stitched ordering.
|
||||
# Leave it as-is unless we have successfully calculated a new stitched ordering
|
||||
# which is lower than the existing.
|
||||
if new_previous_stitched_ordering is not None and new_previous_stitched_ordering < existing_previous_stitched_ordering:
|
||||
if (
|
||||
new_previous_stitched_ordering is not None
|
||||
and new_previous_stitched_ordering < existing_previous_stitched_ordering
|
||||
):
|
||||
return new_before_gap_event_id
|
||||
else:
|
||||
return existing_previous_stitched_ordering
|
||||
|
||||
|
||||
async def get_room_max_stitched_ordering(self, room_id: str) -> Optional[int]:
|
||||
"""Get the maximum stitched order for any event currently in the room.
|
||||
|
||||
If no events in this room have an assigned stitched order, returns None.
|
||||
"""
|
||||
|
||||
def get_room_max_stitched_ordering_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Optional[int]:
|
||||
sql = "SELECT MAX(stitched_ordering) FROM events WHERE room_id=?"
|
||||
txn.execute(sql, [room_id])
|
||||
ret = [r[0] for r in txn]
|
||||
return ret[0]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_room_max_stitched_ordering", get_room_max_stitched_ordering_txn
|
||||
)
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class _LinkMap:
|
||||
"""A helper type for tracking links between chains."""
|
||||
|
||||
@@ -2755,3 +2755,21 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
sender=row[0],
|
||||
received_ts=row[1],
|
||||
)
|
||||
|
||||
async def get_room_max_stitched_ordering(self, room_id: str) -> Optional[int]:
|
||||
"""Get the maximum stitched order for any event currently in the room.
|
||||
|
||||
If no events in this room have an assigned stitched order, returns None.
|
||||
"""
|
||||
|
||||
def get_room_max_stitched_ordering_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Optional[int]:
|
||||
sql = "SELECT MAX(stitched_ordering) FROM events WHERE room_id=?"
|
||||
txn.execute(sql, [room_id])
|
||||
ret = [r[0] for r in txn]
|
||||
return ret[0]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_room_max_stitched_ordering", get_room_max_stitched_ordering_txn
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user