1
0

Compare commits

...

21 Commits

Author SHA1 Message Date
Richard van der Hoff
5f6fff3b42 Persist in stitched order 2025-09-03 15:05:40 +01:00
Richard van der Hoff
858f09bd09 Assign stitched ordering when de-outliering 2025-09-03 14:56:41 +01:00
Richard van der Hoff
3d10f0f9c5 Assign stitched orders to pulled events before persisting 2025-09-03 14:34:02 +01:00
Richard van der Hoff
53fb62de61 Update assign_stitched_orders to not require EventContext 2025-09-03 14:16:50 +01:00
Richard van der Hoff
8728124f76 fixup! Move stitched_ordering property to EventBase 2025-09-03 14:16:09 +01:00
Richard van der Hoff
fb86791f48 Move assign_stitched_orders to top level 2025-09-03 14:05:21 +01:00
Richard van der Hoff
77bfaf91cc fixup! Move stitched_ordering property to EventBase 2025-09-03 13:57:15 +01:00
Richard van der Hoff
d3710a6fd7 Remove code that attempted to assign stitched orderings retrospectively 2025-09-03 13:56:23 +01:00
Richard van der Hoff
4a5cfed5ca Move stitched_ordering property to EventBase
... so that we don't need to have an EventContext when we do the assignment
2025-09-03 13:20:47 +01:00
Richard van der Hoff
ccdb734051 lint 2025-09-03 13:10:34 +01:00
Richard van der Hoff
596dfdb4b4 Update stitched_order of backfilled events after persisting 2025-09-03 13:10:34 +01:00
Andy Balaam
fe7fef2893 When finding event before a gap, exclude outlier matching the gap 2025-09-03 13:10:34 +01:00
Andy Balaam
f2c0b0bd57 Log more while deciding the stitched order 2025-09-03 13:10:34 +01:00
Richard van der Hoff
883bb8a9b1 Handle gaps with unassigned stitched ordering 2025-09-03 13:10:34 +01:00
Richard van der Hoff
93d1c4f1d5 Rework _find_before_gap_event_id
There being no before_gap_event_id is an error case, not normal behaviour, so
we should handle it differently.
2025-09-03 13:10:34 +01:00
Richard van der Hoff
254a83f2f6 tests for find_predecessors 2025-09-03 13:10:34 +01:00
Richard van der Hoff
8415a185e0 WIP on stitched ordering algorithm 2025-09-03 13:10:34 +01:00
Richard van der Hoff
d55ab5ea78 Assign ordering to backward extremities
We're going to use backward extremities to represent "gaps" in our
algorithm. (Specifically, a "gap" is represented by a set of backward
extremities which all appear at the same point in the stitched ordering).

Accordingly, we need to assign them a stitched ordering. We're going to do this
indirectly, by referencing the event that appears before it in the stitched
ordering, so that there is only one table that has a stitched ordering column.
2025-09-03 13:10:34 +01:00
Richard van der Hoff
114541e9f8 Test for simple case 2025-09-03 13:10:34 +01:00
Richard van der Hoff
c9900ae6dd Assign a dumb stitched ordering to incoming events
For now, we just give each incoming event the next stitched ordering.
2025-09-03 13:10:34 +01:00
Richard van der Hoff
dbebdab044 Pass EventContext into _update_backward_extremities 2025-09-03 13:10:32 +01:00
9 changed files with 540 additions and 38 deletions

View File

@@ -208,6 +208,8 @@ class EventBase(metaclass=abc.ABCMeta):
self.internal_metadata = EventInternalMetadata(internal_metadata_dict)
self._stitched_ordering: Optional[int] = None
depth: DictProperty[int] = DictProperty("depth")
content: DictProperty[JsonDict] = DictProperty("content")
hashes: DictProperty[Dict[str, str]] = DictProperty("hashes")
@@ -323,6 +325,20 @@ class EventBase(metaclass=abc.ABCMeta):
# this will be a no-op if the event dict is already frozen.
self._dict = freeze(self._dict)
def assign_stitched_ordering(self, stitched_ordering: int) -> None:
"""Assign a stitched ordering to this event, if one has not already been assigned.
TODO: figure out a way to only expose this on events that have not yet been persisted.
"""
if self._stitched_ordering is not None:
raise RuntimeError("Attempt to assign stitched ordering twice")
self._stitched_ordering = stitched_ordering
@property
def stitched_ordering(self) -> Optional[int]:
"""Return the stitched ordering for this event. If one has not (yet) been assigned, returns `None`."""
return self._stitched_ordering
def __str__(self) -> str:
return self.__repr__()

View File

@@ -86,6 +86,8 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
)
from synapse.state import StateResolutionStore
from synapse.storage.controllers.persist_events import assign_stitched_orders
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
PersistedEventPosition,
@@ -894,18 +896,30 @@ class FederationEventHandler:
)
@trace
async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None:
# We want to sort these by depth so we process them and tell clients about
# them in order. It's also more efficient to backfill this way (`depth`
# ascending) because one backfill event is likely to be the `prev_event` of
# the next event we're going to process.
sorted_events = sorted(new_events, key=lambda x: x.depth)
async def _process_new_pulled_events(new_events: List[EventBase]) -> None:
room_id = new_events[0].room_id
await assign_stitched_orders(room_id, new_events, self._store)
# We want to sort these by stitched ordering, so that events that will
# be sent on to clients over /sync will receive stream_orderings that
# are consistent with stitched orderings (i.e. we will serve them to clients
# in the same order as stitched_order).
#
# It's also more efficient to backfill this way, because one backfill event
# is likely to be the `prev_event` of the next event we're going to process.
#
# Outliers will not yet have received a stitched ordering, but it doesn't
# really matter what order they get persisted in, because they don't get
# sent to clients and we don't do so much state resolution for them. We just
# persist them before any other events.
sorted_events = sorted(new_events, key=lambda x: (x.stitched_ordering or 0))
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
# Check if we've already tried to process these events at some point in the
# past. We aren't concerned with the expontntial backoff here, just whether it
# past. We aren't concerned with the exponential backoff here, just whether it
# has failed to be processed before.
event_ids_with_failed_pull_attempts = (
await self._store.get_event_ids_with_failed_pull_attempts(
@@ -2385,3 +2399,32 @@ class FederationEventHandler:
len(ev.auth_event_ids()),
)
raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
def find_predecessors(event_ids: Iterable[str], batch: List[EventBase]) -> Set[str]:
"""
Walk the tree of dependencies (in batch), and return every event that is
in batch, and is an ancestor of one of the supplied events.
"""
found = set()
unexplored = set(event_ids)
while len(unexplored) > 0:
next_unexplored: Set[str] = set()
# Iterate through the incoming events, looking for events in our "unexplored"
# set. For each matching event, add it to the "found" set, and add its
# "prev_events" to the "unexplored" set for the next pass.
for event in batch:
if event.event_id in unexplored:
found.add(event.event_id)
next_unexplored.update(
(
event_id
for event_id in event.prev_event_ids()
if event_id not in found
)
)
unexplored = next_unexplored
return found

View File

@@ -63,6 +63,7 @@ from synapse.logging.opentracing import (
)
from synapse.metrics import SERVER_NAME_LABEL
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage import DataStore
from synapse.storage.controllers.state import StateStorageController
from synapse.storage.databases import Databases
from synapse.storage.databases.main.events import DeltaState
@@ -616,6 +617,12 @@ class EventsPersistenceStorageController:
if not events_and_contexts:
return replaced_events
# TODO massive hack
if events_and_contexts[0][0].stitched_ordering is None:
await assign_stitched_orders(
room_id, [ev for (ev, _) in events_and_contexts], self.main_store
)
chunks = [
events_and_contexts[x : x + 100]
for x in range(0, len(events_and_contexts), 100)
@@ -1241,3 +1248,120 @@ class EventsPersistenceStorageController:
return True
return False
def find_predecessors(event_ids: Iterable[str], batch: List[EventBase]) -> Set[str]:
"""
Walk the tree of dependencies (in batch), and return every event that is
in batch, and is an ancestor of one of the supplied events.
"""
found = set()
unexplored = set(event_ids)
while len(unexplored) > 0:
next_unexplored: Set[str] = set()
# Iterate through the incoming events, looking for events in our "unexplored"
# set. For each matching event, add it to the "found" set, and add its
# "prev_events" to the "unexplored" set for the next pass.
for event in batch:
if event.event_id in unexplored:
found.add(event.event_id)
next_unexplored.update(
(
event_id
for event_id in event.prev_event_ids()
if event_id not in found
)
)
unexplored = next_unexplored
return found
async def assign_stitched_orders(
room_id: str,
events: List[EventBase],
store: DataStore,
) -> None:
"""
Updates the events within `events`, to assign a
stitched_ordering to each event.
"""
# Take a copy of the events we have to process
# TODO find a better way to exclude outliers
remaining_batch = list(ev for ev in events if not ev.internal_metadata.is_outlier())
# Find all events in the current batch which are in a timeline gap
gap_events = await store.db_pool.simple_select_many_batch(
"event_backward_extremities",
"event_id",
(ev.event_id for ev in remaining_batch),
["event_id", "before_gap_event_id"],
)
# TODO matching against gaps is pointless here
# TODO sort gap_events by DAG;received order
for gap_event, before_gap_event_id in gap_events:
logger.debug("Processing received gap event %s", gap_event)
matching_events = [gap_event] # TODO find other events in the same gap
# Find all predecessors of those events in the batch
to_insert = find_predecessors(matching_events, remaining_batch)
logger.debug("Processing to_insert set %s", to_insert)
# Find the stitched order of the event before the gap
# TODO consider doing this with a join
previous_event_stitched_order = await store.db_pool.simple_select_one_onecol(
"events",
{"event_id": before_gap_event_id},
"stitched_ordering",
True,
)
logger.debug(
"Previous event stitched_ordering = %i", previous_event_stitched_order
)
# if previous_event_stitched_order is None, that means we have a room
# where there are existing events or gaps without assigned stitched orders.
# Let's give up trying to assign stitched orders here.
if previous_event_stitched_order is None:
# TODO do something better here
logger.warning(
"Found gap event %s without assigned stitched order: bailing",
gap_event,
)
return
still_remaining_batch = []
for event in remaining_batch:
if event.event_id not in to_insert:
still_remaining_batch.append(event)
continue
# TODO we may need to reorder existing events
previous_event_stitched_order += 1
event.assign_stitched_ordering(previous_event_stitched_order)
logger.debug(
"Persisting inserted events with stitched_order=%i",
previous_event_stitched_order,
)
remaining_batch = still_remaining_batch
logger.debug("Remaining events: %s", [ev.event_id for ev in remaining_batch])
logger.debug(
"Remaining events after processing gap matches: %s",
[ev.event_id for ev in remaining_batch],
)
current_max_stream_ordering = (
await store.get_room_max_stitched_ordering(room_id) or 0
)
for event in remaining_batch:
current_max_stream_ordering += 2**16
event.assign_stitched_ordering(current_max_stream_ordering)

View File

@@ -2596,6 +2596,10 @@ class PersistEventsStore:
# scenario. XXX: does this cause bugs? It will mean we won't send such
# events down /sync. In general they will be historical events, so that
# doesn't matter too much, but that is not always the case.
#
# On the other hand, we *will* assign a stitched ordering at this point.
# Outliers are not assigned stitched orderings when they are first
# persisted as outliers.
logger.info(
"_update_outliers_txn: Updating state for ex-outlier event %s",
@@ -2624,12 +2628,12 @@ class PersistEventsStore:
},
)
sql = "UPDATE events SET outlier = FALSE WHERE event_id = ?"
txn.execute(sql, (event.event_id,))
sql = "UPDATE events SET outlier = FALSE, stitched_ordering = ? WHERE event_id = ?"
txn.execute(sql, (event.stitched_ordering, event.event_id,))
# Update the event_backward_extremities table now that this
# event isn't an outlier any more.
self._update_backward_extremeties(txn, [event])
self._update_backward_extremeties(txn, [(event, context)])
return [ec for ec in events_and_contexts if ec[0] not in to_remove]
@@ -2687,6 +2691,7 @@ class PersistEventsStore:
"contains_url",
"state_key",
"rejection_reason",
"stitched_ordering",
),
values=[
(
@@ -2705,6 +2710,7 @@ class PersistEventsStore:
"url" in event.content and isinstance(event.content["url"], str),
event.get_state_key(),
context.rejected,
event.stitched_ordering,
)
for event, context in events_and_contexts
],
@@ -2811,7 +2817,8 @@ class PersistEventsStore:
# Update the event_forward_extremities, event_backward_extremities and
# event_edges tables.
self._handle_mult_prev_events(
txn, events=[event for event, _ in events_and_contexts]
txn,
events_and_contexts,
)
for event, _ in events_and_contexts:
@@ -3517,7 +3524,9 @@ class PersistEventsStore:
)
def _handle_mult_prev_events(
self, txn: LoggingTransaction, events: List[EventBase]
self,
txn: LoggingTransaction,
events_and_contexts: List[EventPersistencePair],
) -> None:
"""
For the given event, update the event edges table and forward and
@@ -3528,14 +3537,18 @@ class PersistEventsStore:
table="event_edges",
keys=("event_id", "prev_event_id"),
values=[
(ev.event_id, e_id) for ev in events for e_id in ev.prev_event_ids()
(ev.event_id, e_id)
for (ev, _) in events_and_contexts
for e_id in ev.prev_event_ids()
],
)
self._update_backward_extremeties(txn, events)
self._update_backward_extremeties(txn, events_and_contexts)
def _update_backward_extremeties(
self, txn: LoggingTransaction, events: List[EventBase]
self,
txn: LoggingTransaction,
events_and_contexts: List[EventPersistencePair],
) -> None:
"""Updates the event_backward_extremities tables based on the new/updated
events being persisted.
@@ -3546,45 +3559,73 @@ class PersistEventsStore:
Forward extremities are handled when we first start persisting the events.
"""
room_id = events[0].room_id
room_id = events_and_contexts[0][0].room_id
potential_backwards_extremities = {
e_id
for ev in events
for e_id in ev.prev_event_ids()
if not ev.internal_metadata.is_outlier()
}
# Map from missing event ID, to the lowest stitched order of the events that reference it.
potential_backwards_extremities: Dict[str, Optional[int]] = {}
for ev, ctx in events_and_contexts:
if ev.internal_metadata.is_outlier():
continue
for prev_event in ev.prev_event_ids():
lowest_referring_ordering = potential_backwards_extremities.get(
"prev_event"
)
persisted_event_stitched_ordering = ev.stitched_ordering
# If any of the events we persisted did not get assigned a stitched order,
# we cannot yet assign a stitched order to the backwards extremity either.
if persisted_event_stitched_ordering is None:
potential_backwards_extremities[prev_event] = None
continue
if lowest_referring_ordering is None:
lowest_referring_ordering = persisted_event_stitched_ordering
else:
lowest_referring_ordering = min(
lowest_referring_ordering, persisted_event_stitched_ordering
)
potential_backwards_extremities[prev_event] = lowest_referring_ordering
if not potential_backwards_extremities:
return
existing_events_outliers = self.db_pool.simple_select_many_txn(
# Filter potential_backwards_extremities to remove events that are in the
# table.
existing_events = self.db_pool.simple_select_many_txn(
txn,
table="events",
column="event_id",
iterable=potential_backwards_extremities,
iterable=potential_backwards_extremities.keys(),
keyvalues={"outlier": False},
retcols=("event_id",),
)
for (ev,) in existing_events:
del potential_backwards_extremities[ev]
potential_backwards_extremities.difference_update(
e for (e,) in existing_events_outliers
)
if potential_backwards_extremities:
self.db_pool.simple_upsert_many_txn(
txn,
table="event_backward_extremities",
key_names=("room_id", "event_id"),
key_values=[(room_id, ev) for ev in potential_backwards_extremities],
value_names=(),
value_values=(),
for (
backward_extremity,
lowest_referring_ordering,
) in potential_backwards_extremities.items():
before_gap_event_id = self._find_before_gap_event_id(
txn, room_id, backward_extremity, lowest_referring_ordering
)
self.db_pool.simple_upsert_txn(
txn,
table="event_backward_extremities",
keyvalues={
"room_id": room_id,
"event_id": backward_extremity,
},
values={"before_gap_event_id": before_gap_event_id},
)
if potential_backwards_extremities:
# Record the stream orderings where we have new gaps.
gap_events = [
(room_id, self._instance_name, ev.internal_metadata.stream_ordering)
for ev in events
for (ev, _) in events_and_contexts
if any(
e_id in potential_backwards_extremities
for e_id in ev.prev_event_ids()
@@ -3605,7 +3646,7 @@ class PersistEventsStore:
)
backward_extremity_tuples_to_remove = [
(ev.event_id, ev.room_id)
for ev in events
for (ev, _) in events_and_contexts
if not ev.internal_metadata.is_outlier()
# If we encountered an event with no prev_events, then we might
# as well remove it now because it won't ever have anything else
@@ -3630,6 +3671,90 @@ class PersistEventsStore:
backward_extremity_tuples_to_remove,
)
@staticmethod
def _find_before_gap_event_id(
txn: LoggingTransaction,
room_id: str,
backward_extremity_event_id: str,
lowest_referring_ordering: Optional[int],
) -> Optional[str]:
"""
Figure out where in the stitched order a gap (or backwards extremity) belongs.
The result is in terms of the event that precedes the gap in the ordering.
"None" means we were unable to find a preceding event, which should only
happen if the create event was not assigned a stitched ordering.
We check if the backwards extremity already exists in the database, at an
earlier ordering than that implied by `lowest_referring_ordering`, and if
so return that location. Otherwise, we return the event before
`lowest_referring_ordering`.
Args:
txn
room_id: ID of the room that the gap is in
backward_extremity_event_id: Event ID of the backwards extremity (i.e.
an event that is not in our database).
lowest_referring_ordering: The lowest stitched ordering of all the events
that we have just inserted, that refer to this backwards extremity.
"""
(new_before_gap_event_id, new_previous_stitched_ordering) = (None, None)
if lowest_referring_ordering is not None:
# Given the lowest stitched ordering of all the events that we have just
# inserted, find the previous event (by stitched ordering); the gap
# will likely come just afterwards.
#
# Note: we include "AND event_id <> backward_extremity_event_id" because
# if this backward extremity is actually an outlier, then that event
# does exist in events, but we don't want to find it.
txn.execute(
"""
SELECT event_id, stitched_ordering FROM events
WHERE room_id = ? AND stitched_ordering < ? AND event_id <> ?
ORDER BY stitched_ordering DESC LIMIT 1
""",
[room_id, lowest_referring_ordering, backward_extremity_event_id],
)
row = txn.fetchone()
if row is not None:
(new_before_gap_event_id, new_previous_stitched_ordering) = row
# If this is an existing backwards extremity, see where it currently
# exists in the order.
txn.execute(
"""
SELECT events.event_id, events.stitched_ordering FROM
event_backward_extremities LEFT JOIN events ON
events.event_id = event_backward_extremities.before_gap_event_id
WHERE event_backward_extremities.event_id = ?
""",
[backward_extremity_event_id],
)
row = txn.fetchone()
if row is None:
# Not an existing backwards extremity: use our new before_gap_event_id
return new_before_gap_event_id
(existing_before_gap_id, existing_previous_stitched_ordering) = row
# If the existing backwards extremity has not yet been assigned a
# stream ordering, use our new before_gap_event_id.
if existing_previous_stitched_ordering is None:
return new_before_gap_event_id
# This is an existing backwards extremity with an assigned stitched ordering.
# Leave it as-is unless we have successfully calculated a new stitched ordering
# which is lower than the existing.
if (
new_previous_stitched_ordering is not None
and new_previous_stitched_ordering < existing_previous_stitched_ordering
):
return new_before_gap_event_id
else:
return existing_previous_stitched_ordering
@attr.s(slots=True, auto_attribs=True)
class _LinkMap:

View File

@@ -2755,3 +2755,21 @@ class EventsWorkerStore(SQLBaseStore):
sender=row[0],
received_ts=row[1],
)
async def get_room_max_stitched_ordering(self, room_id: str) -> Optional[int]:
"""Get the maximum stitched order for any event currently in the room.
If no events in this room have an assigned stitched order, returns None.
"""
def get_room_max_stitched_ordering_txn(
txn: LoggingTransaction,
) -> Optional[int]:
sql = "SELECT MAX(stitched_ordering) FROM events WHERE room_id=?"
txn.execute(sql, [room_id])
ret = [r[0] for r in txn]
return ret[0]
return await self.db_pool.runInteraction(
"get_room_max_stitched_ordering", get_room_max_stitched_ordering_txn
)

View File

@@ -0,0 +1,40 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
ALTER TABLE events ADD COLUMN stitched_ordering BIGINT;
CREATE UNIQUE INDEX events_stitched_order ON events(room_id, stitched_ordering); -- TODO make this concurrent
--CREATE TABLE stitched_ordering_gaps (
-- room_id TEXT NOT NULL,
-- following_event_id TEXT NOT NULL,
-- missing_event_id TEXT NOT NULL,
-- UNIQUE (room_id, following_event_id, missing_event_id)
--);
--CREATE INDEX stitched_ordering_gaps_missing_events ON stitched_ordering_gaps(room_id, missing_event_id);
-- Gaps in the stitched ordering are equivalent to a group of backward extremities that appear at
-- the same point in the stitched ordering.
--
-- Rather than explicitly tracking where in the stitched ordering a given gap appears, we record the
-- event id of the event that comes *before* the gap in the stitched ordering. Doing so means that:
--
-- 1. There is only one table that has a `stitched_ordering` column, making it easier to figure out
-- how to insert a batch of events between existing events (and making the UNIQUE constraint effective).
--
-- 2. We don't need to allocate space in the stitched ordering for gaps; in particular we can assign an order
-- to gaps *after* we have persisted the events. (We could probably work around this by double-spacing inserted
-- events? but still, it's a nice property)
--
-- Note that this assumes that we never need to insert an event *before* a gap (or if we did,
-- we'd have to update this table).
ALTER TABLE event_backward_extremities ADD COLUMN before_gap_event_id TEXT REFERENCES events (event_id);

View File

View File

@@ -0,0 +1,91 @@
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
from typing import List
import attr
from tests.unittest import TestCase
from synapse.storage.controllers.persist_events import find_predecessors
class FindPredecessorsTestCase(TestCase):
def test_predecessors_finds_nothing_if_event_is_not_in_batch(self) -> None:
batch = [
FakeEvent(event_id="B", prev_event_ids=["C"]),
]
predecessors = find_predecessors({"A"}, batch) # type: ignore[arg-type]
self.assertEqual(predecessors, set())
def test_predecessors_finds_only_event_if_it_has_no_predecessors(self) -> None:
batch = [
FakeEvent(event_id="E1", prev_event_ids=[]),
FakeEvent(event_id="E2", prev_event_ids=["E3"]),
]
predecessors = find_predecessors({"E1"}, batch) # type: ignore[arg-type]
self.assertEqual(predecessors, {"E1"})
def test_predecessors_finds_all_ancestors(self) -> None:
batch = [
FakeEvent(event_id="A", prev_event_ids=["B", "C"]),
FakeEvent(event_id="B", prev_event_ids=["D"]),
FakeEvent(event_id="C", prev_event_ids=["D"]),
FakeEvent(event_id="D", prev_event_ids=["E"]),
FakeEvent(event_id="E", prev_event_ids=[]),
FakeEvent(event_id="F", prev_event_ids=["G", "H"]),
FakeEvent(event_id="G", prev_event_ids=[]),
]
predecessors = find_predecessors({"A"}, batch) # type: ignore[arg-type]
self.assertEqual(predecessors, {"A", "B", "C", "D", "E"})
def test_predecessors_ignores_cycles(self) -> None:
batch = [
FakeEvent(event_id="E1", prev_event_ids=["E2"]),
FakeEvent(event_id="E2", prev_event_ids=["E1"]),
]
predecessors = find_predecessors({"E1"}, batch) # type: ignore[arg-type]
self.assertEqual(predecessors, {"E1", "E2"})
def test_predecessors_ignores_self_reference_cycles(self) -> None:
batch = [
FakeEvent(event_id="E1", prev_event_ids=["E2"]),
FakeEvent(event_id="E2", prev_event_ids=["E2"]),
]
predecessors = find_predecessors({"E1"}, batch) # type: ignore[arg-type]
self.assertEqual(predecessors, {"E1", "E2"})
def test_predecessors_finds_ancestors_of_multiple_starting_events(self) -> None:
batch = [
FakeEvent(event_id="A", prev_event_ids=["B"]),
FakeEvent(event_id="B", prev_event_ids=[]),
FakeEvent(event_id="C", prev_event_ids=["D"]),
FakeEvent(event_id="D", prev_event_ids=["E"]),
FakeEvent(event_id="E", prev_event_ids=[]),
FakeEvent(event_id="F", prev_event_ids=["G"]),
FakeEvent(event_id="G", prev_event_ids=[]),
]
predecessors = find_predecessors({"A", "C"}, batch) # type: ignore[arg-type]
self.assertEqual(predecessors, {"A", "B", "C", "D", "E"})
@attr.s(auto_attribs=True)
class FakeEvent:
event_id: str
_prev_event_ids: List[str]
def prev_event_ids(self) -> List[str]:
return self._prev_event_ids

View File

@@ -31,6 +31,7 @@ from synapse.federation.federation_base import event_from_pdu_json
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.storage.controllers.persist_events import assign_stitched_orders
from synapse.types import StateMap
from synapse.util import Clock
@@ -519,3 +520,47 @@ class InvalideUsersInRoomCacheTestCase(HomeserverTestCase):
users = self.get_success(self.store.get_users_in_room(room_id))
self.assertEqual(users, [])
class AssignStitchedOrderingTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def prepare(
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
) -> None:
self.state = self.hs.get_state_handler()
# persistence = self.hs.get_storage_controllers().persistence
# assert persistence is not None
# self._persistence = persistence
self.store = self.hs.get_datastores().main
def test_insert_events(self) -> None:
# Create a room
self.register_user("user", "pass")
token = self.login("user", "pass")
room_id = self.helper.create_room_as(
"user", room_version=RoomVersions.V12.identifier, tok=token
)
# Build a test event
test_event = event_from_pdu_json(
{
"type": EventTypes.Message,
"content": {"body": "blah"},
"room_id": room_id,
"sender": "@user:other",
"depth": 5,
"prev_events": [],
"auth_events": [],
"origin_server_ts": self.clock.time_msec(),
},
RoomVersions.V12,
)
self.get_success(assign_stitched_orders(room_id, [test_event], self.store))
self.assertEqual(test_event.stitched_ordering, 6 * 2**16)