Compare commits
21 Commits
devon/acl-
...
rav/stitch
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5f6fff3b42 | ||
|
|
858f09bd09 | ||
|
|
3d10f0f9c5 | ||
|
|
53fb62de61 | ||
|
|
8728124f76 | ||
|
|
fb86791f48 | ||
|
|
77bfaf91cc | ||
|
|
d3710a6fd7 | ||
|
|
4a5cfed5ca | ||
|
|
ccdb734051 | ||
|
|
596dfdb4b4 | ||
|
|
fe7fef2893 | ||
|
|
f2c0b0bd57 | ||
|
|
883bb8a9b1 | ||
|
|
93d1c4f1d5 | ||
|
|
254a83f2f6 | ||
|
|
8415a185e0 | ||
|
|
d55ab5ea78 | ||
|
|
114541e9f8 | ||
|
|
c9900ae6dd | ||
|
|
dbebdab044 |
@@ -208,6 +208,8 @@ class EventBase(metaclass=abc.ABCMeta):
|
||||
|
||||
self.internal_metadata = EventInternalMetadata(internal_metadata_dict)
|
||||
|
||||
self._stitched_ordering: Optional[int] = None
|
||||
|
||||
depth: DictProperty[int] = DictProperty("depth")
|
||||
content: DictProperty[JsonDict] = DictProperty("content")
|
||||
hashes: DictProperty[Dict[str, str]] = DictProperty("hashes")
|
||||
@@ -323,6 +325,20 @@ class EventBase(metaclass=abc.ABCMeta):
|
||||
# this will be a no-op if the event dict is already frozen.
|
||||
self._dict = freeze(self._dict)
|
||||
|
||||
def assign_stitched_ordering(self, stitched_ordering: int) -> None:
|
||||
"""Assign a stitched ordering to this event, if one has not already been assigned.
|
||||
|
||||
TODO: figure out a way to only expose this on events that have not yet been persisted.
|
||||
"""
|
||||
if self._stitched_ordering is not None:
|
||||
raise RuntimeError("Attempt to assign stitched ordering twice")
|
||||
self._stitched_ordering = stitched_ordering
|
||||
|
||||
@property
|
||||
def stitched_ordering(self) -> Optional[int]:
|
||||
"""Return the stitched ordering for this event. If one has not (yet) been assigned, returns `None`."""
|
||||
return self._stitched_ordering
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.__repr__()
|
||||
|
||||
|
||||
@@ -86,6 +86,8 @@ from synapse.replication.http.federation import (
|
||||
ReplicationFederationSendEventsRestServlet,
|
||||
)
|
||||
from synapse.state import StateResolutionStore
|
||||
from synapse.storage.controllers.persist_events import assign_stitched_orders
|
||||
from synapse.storage.database import LoggingTransaction
|
||||
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
|
||||
from synapse.types import (
|
||||
PersistedEventPosition,
|
||||
@@ -894,18 +896,30 @@ class FederationEventHandler:
|
||||
)
|
||||
|
||||
@trace
|
||||
async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None:
|
||||
# We want to sort these by depth so we process them and tell clients about
|
||||
# them in order. It's also more efficient to backfill this way (`depth`
|
||||
# ascending) because one backfill event is likely to be the `prev_event` of
|
||||
# the next event we're going to process.
|
||||
sorted_events = sorted(new_events, key=lambda x: x.depth)
|
||||
async def _process_new_pulled_events(new_events: List[EventBase]) -> None:
|
||||
room_id = new_events[0].room_id
|
||||
await assign_stitched_orders(room_id, new_events, self._store)
|
||||
|
||||
# We want to sort these by stitched ordering, so that events that will
|
||||
# be sent on to clients over /sync will receive stream_orderings that
|
||||
# are consistent with stitched orderings (i.e. we will serve them to clients
|
||||
# in the same order as stitched_order).
|
||||
#
|
||||
# It's also more efficient to backfill this way, because one backfill event
|
||||
# is likely to be the `prev_event` of the next event we're going to process.
|
||||
#
|
||||
# Outliers will not yet have received a stitched ordering, but it doesn't
|
||||
# really matter what order they get persisted in, because they don't get
|
||||
# sent to clients and we don't do so much state resolution for them. We just
|
||||
# persist them before any other events.
|
||||
|
||||
sorted_events = sorted(new_events, key=lambda x: (x.stitched_ordering or 0))
|
||||
for ev in sorted_events:
|
||||
with nested_logging_context(ev.event_id):
|
||||
await self._process_pulled_event(origin, ev, backfilled=backfilled)
|
||||
|
||||
# Check if we've already tried to process these events at some point in the
|
||||
# past. We aren't concerned with the expontntial backoff here, just whether it
|
||||
# past. We aren't concerned with the exponential backoff here, just whether it
|
||||
# has failed to be processed before.
|
||||
event_ids_with_failed_pull_attempts = (
|
||||
await self._store.get_event_ids_with_failed_pull_attempts(
|
||||
@@ -2385,3 +2399,32 @@ class FederationEventHandler:
|
||||
len(ev.auth_event_ids()),
|
||||
)
|
||||
raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
|
||||
|
||||
|
||||
def find_predecessors(event_ids: Iterable[str], batch: List[EventBase]) -> Set[str]:
|
||||
"""
|
||||
Walk the tree of dependencies (in batch), and return every event that is
|
||||
in batch, and is an ancestor of one of the supplied events.
|
||||
"""
|
||||
found = set()
|
||||
unexplored = set(event_ids)
|
||||
while len(unexplored) > 0:
|
||||
next_unexplored: Set[str] = set()
|
||||
|
||||
# Iterate through the incoming events, looking for events in our "unexplored"
|
||||
# set. For each matching event, add it to the "found" set, and add its
|
||||
# "prev_events" to the "unexplored" set for the next pass.
|
||||
for event in batch:
|
||||
if event.event_id in unexplored:
|
||||
found.add(event.event_id)
|
||||
next_unexplored.update(
|
||||
(
|
||||
event_id
|
||||
for event_id in event.prev_event_ids()
|
||||
if event_id not in found
|
||||
)
|
||||
)
|
||||
|
||||
unexplored = next_unexplored
|
||||
|
||||
return found
|
||||
|
||||
@@ -63,6 +63,7 @@ from synapse.logging.opentracing import (
|
||||
)
|
||||
from synapse.metrics import SERVER_NAME_LABEL
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage import DataStore
|
||||
from synapse.storage.controllers.state import StateStorageController
|
||||
from synapse.storage.databases import Databases
|
||||
from synapse.storage.databases.main.events import DeltaState
|
||||
@@ -616,6 +617,12 @@ class EventsPersistenceStorageController:
|
||||
if not events_and_contexts:
|
||||
return replaced_events
|
||||
|
||||
# TODO massive hack
|
||||
if events_and_contexts[0][0].stitched_ordering is None:
|
||||
await assign_stitched_orders(
|
||||
room_id, [ev for (ev, _) in events_and_contexts], self.main_store
|
||||
)
|
||||
|
||||
chunks = [
|
||||
events_and_contexts[x : x + 100]
|
||||
for x in range(0, len(events_and_contexts), 100)
|
||||
@@ -1241,3 +1248,120 @@ class EventsPersistenceStorageController:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def find_predecessors(event_ids: Iterable[str], batch: List[EventBase]) -> Set[str]:
|
||||
"""
|
||||
Walk the tree of dependencies (in batch), and return every event that is
|
||||
in batch, and is an ancestor of one of the supplied events.
|
||||
"""
|
||||
found = set()
|
||||
unexplored = set(event_ids)
|
||||
while len(unexplored) > 0:
|
||||
next_unexplored: Set[str] = set()
|
||||
|
||||
# Iterate through the incoming events, looking for events in our "unexplored"
|
||||
# set. For each matching event, add it to the "found" set, and add its
|
||||
# "prev_events" to the "unexplored" set for the next pass.
|
||||
for event in batch:
|
||||
if event.event_id in unexplored:
|
||||
found.add(event.event_id)
|
||||
next_unexplored.update(
|
||||
(
|
||||
event_id
|
||||
for event_id in event.prev_event_ids()
|
||||
if event_id not in found
|
||||
)
|
||||
)
|
||||
|
||||
unexplored = next_unexplored
|
||||
|
||||
return found
|
||||
|
||||
|
||||
async def assign_stitched_orders(
|
||||
room_id: str,
|
||||
events: List[EventBase],
|
||||
store: DataStore,
|
||||
) -> None:
|
||||
"""
|
||||
Updates the events within `events`, to assign a
|
||||
stitched_ordering to each event.
|
||||
"""
|
||||
# Take a copy of the events we have to process
|
||||
# TODO find a better way to exclude outliers
|
||||
remaining_batch = list(ev for ev in events if not ev.internal_metadata.is_outlier())
|
||||
|
||||
# Find all events in the current batch which are in a timeline gap
|
||||
gap_events = await store.db_pool.simple_select_many_batch(
|
||||
"event_backward_extremities",
|
||||
"event_id",
|
||||
(ev.event_id for ev in remaining_batch),
|
||||
["event_id", "before_gap_event_id"],
|
||||
)
|
||||
|
||||
# TODO matching against gaps is pointless here
|
||||
# TODO sort gap_events by DAG;received order
|
||||
for gap_event, before_gap_event_id in gap_events:
|
||||
logger.debug("Processing received gap event %s", gap_event)
|
||||
|
||||
matching_events = [gap_event] # TODO find other events in the same gap
|
||||
|
||||
# Find all predecessors of those events in the batch
|
||||
to_insert = find_predecessors(matching_events, remaining_batch)
|
||||
|
||||
logger.debug("Processing to_insert set %s", to_insert)
|
||||
|
||||
# Find the stitched order of the event before the gap
|
||||
# TODO consider doing this with a join
|
||||
previous_event_stitched_order = await store.db_pool.simple_select_one_onecol(
|
||||
"events",
|
||||
{"event_id": before_gap_event_id},
|
||||
"stitched_ordering",
|
||||
True,
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
"Previous event stitched_ordering = %i", previous_event_stitched_order
|
||||
)
|
||||
|
||||
# if previous_event_stitched_order is None, that means we have a room
|
||||
# where there are existing events or gaps without assigned stitched orders.
|
||||
# Let's give up trying to assign stitched orders here.
|
||||
if previous_event_stitched_order is None:
|
||||
# TODO do something better here
|
||||
logger.warning(
|
||||
"Found gap event %s without assigned stitched order: bailing",
|
||||
gap_event,
|
||||
)
|
||||
return
|
||||
|
||||
still_remaining_batch = []
|
||||
for event in remaining_batch:
|
||||
if event.event_id not in to_insert:
|
||||
still_remaining_batch.append(event)
|
||||
continue
|
||||
|
||||
# TODO we may need to reorder existing events
|
||||
previous_event_stitched_order += 1
|
||||
event.assign_stitched_ordering(previous_event_stitched_order)
|
||||
logger.debug(
|
||||
"Persisting inserted events with stitched_order=%i",
|
||||
previous_event_stitched_order,
|
||||
)
|
||||
|
||||
remaining_batch = still_remaining_batch
|
||||
logger.debug("Remaining events: %s", [ev.event_id for ev in remaining_batch])
|
||||
|
||||
logger.debug(
|
||||
"Remaining events after processing gap matches: %s",
|
||||
[ev.event_id for ev in remaining_batch],
|
||||
)
|
||||
|
||||
current_max_stream_ordering = (
|
||||
await store.get_room_max_stitched_ordering(room_id) or 0
|
||||
)
|
||||
|
||||
for event in remaining_batch:
|
||||
current_max_stream_ordering += 2**16
|
||||
event.assign_stitched_ordering(current_max_stream_ordering)
|
||||
|
||||
@@ -2596,6 +2596,10 @@ class PersistEventsStore:
|
||||
# scenario. XXX: does this cause bugs? It will mean we won't send such
|
||||
# events down /sync. In general they will be historical events, so that
|
||||
# doesn't matter too much, but that is not always the case.
|
||||
#
|
||||
# On the other hand, we *will* assign a stitched ordering at this point.
|
||||
# Outliers are not assigned stitched orderings when they are first
|
||||
# persisted as outliers.
|
||||
|
||||
logger.info(
|
||||
"_update_outliers_txn: Updating state for ex-outlier event %s",
|
||||
@@ -2624,12 +2628,12 @@ class PersistEventsStore:
|
||||
},
|
||||
)
|
||||
|
||||
sql = "UPDATE events SET outlier = FALSE WHERE event_id = ?"
|
||||
txn.execute(sql, (event.event_id,))
|
||||
sql = "UPDATE events SET outlier = FALSE, stitched_ordering = ? WHERE event_id = ?"
|
||||
txn.execute(sql, (event.stitched_ordering, event.event_id,))
|
||||
|
||||
# Update the event_backward_extremities table now that this
|
||||
# event isn't an outlier any more.
|
||||
self._update_backward_extremeties(txn, [event])
|
||||
self._update_backward_extremeties(txn, [(event, context)])
|
||||
|
||||
return [ec for ec in events_and_contexts if ec[0] not in to_remove]
|
||||
|
||||
@@ -2687,6 +2691,7 @@ class PersistEventsStore:
|
||||
"contains_url",
|
||||
"state_key",
|
||||
"rejection_reason",
|
||||
"stitched_ordering",
|
||||
),
|
||||
values=[
|
||||
(
|
||||
@@ -2705,6 +2710,7 @@ class PersistEventsStore:
|
||||
"url" in event.content and isinstance(event.content["url"], str),
|
||||
event.get_state_key(),
|
||||
context.rejected,
|
||||
event.stitched_ordering,
|
||||
)
|
||||
for event, context in events_and_contexts
|
||||
],
|
||||
@@ -2811,7 +2817,8 @@ class PersistEventsStore:
|
||||
# Update the event_forward_extremities, event_backward_extremities and
|
||||
# event_edges tables.
|
||||
self._handle_mult_prev_events(
|
||||
txn, events=[event for event, _ in events_and_contexts]
|
||||
txn,
|
||||
events_and_contexts,
|
||||
)
|
||||
|
||||
for event, _ in events_and_contexts:
|
||||
@@ -3517,7 +3524,9 @@ class PersistEventsStore:
|
||||
)
|
||||
|
||||
def _handle_mult_prev_events(
|
||||
self, txn: LoggingTransaction, events: List[EventBase]
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
events_and_contexts: List[EventPersistencePair],
|
||||
) -> None:
|
||||
"""
|
||||
For the given event, update the event edges table and forward and
|
||||
@@ -3528,14 +3537,18 @@ class PersistEventsStore:
|
||||
table="event_edges",
|
||||
keys=("event_id", "prev_event_id"),
|
||||
values=[
|
||||
(ev.event_id, e_id) for ev in events for e_id in ev.prev_event_ids()
|
||||
(ev.event_id, e_id)
|
||||
for (ev, _) in events_and_contexts
|
||||
for e_id in ev.prev_event_ids()
|
||||
],
|
||||
)
|
||||
|
||||
self._update_backward_extremeties(txn, events)
|
||||
self._update_backward_extremeties(txn, events_and_contexts)
|
||||
|
||||
def _update_backward_extremeties(
|
||||
self, txn: LoggingTransaction, events: List[EventBase]
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
events_and_contexts: List[EventPersistencePair],
|
||||
) -> None:
|
||||
"""Updates the event_backward_extremities tables based on the new/updated
|
||||
events being persisted.
|
||||
@@ -3546,45 +3559,73 @@ class PersistEventsStore:
|
||||
Forward extremities are handled when we first start persisting the events.
|
||||
"""
|
||||
|
||||
room_id = events[0].room_id
|
||||
room_id = events_and_contexts[0][0].room_id
|
||||
|
||||
potential_backwards_extremities = {
|
||||
e_id
|
||||
for ev in events
|
||||
for e_id in ev.prev_event_ids()
|
||||
if not ev.internal_metadata.is_outlier()
|
||||
}
|
||||
# Map from missing event ID, to the lowest stitched order of the events that reference it.
|
||||
potential_backwards_extremities: Dict[str, Optional[int]] = {}
|
||||
for ev, ctx in events_and_contexts:
|
||||
if ev.internal_metadata.is_outlier():
|
||||
continue
|
||||
|
||||
for prev_event in ev.prev_event_ids():
|
||||
lowest_referring_ordering = potential_backwards_extremities.get(
|
||||
"prev_event"
|
||||
)
|
||||
persisted_event_stitched_ordering = ev.stitched_ordering
|
||||
|
||||
# If any of the events we persisted did not get assigned a stitched order,
|
||||
# we cannot yet assign a stitched order to the backwards extremity either.
|
||||
if persisted_event_stitched_ordering is None:
|
||||
potential_backwards_extremities[prev_event] = None
|
||||
continue
|
||||
|
||||
if lowest_referring_ordering is None:
|
||||
lowest_referring_ordering = persisted_event_stitched_ordering
|
||||
else:
|
||||
lowest_referring_ordering = min(
|
||||
lowest_referring_ordering, persisted_event_stitched_ordering
|
||||
)
|
||||
potential_backwards_extremities[prev_event] = lowest_referring_ordering
|
||||
|
||||
if not potential_backwards_extremities:
|
||||
return
|
||||
|
||||
existing_events_outliers = self.db_pool.simple_select_many_txn(
|
||||
# Filter potential_backwards_extremities to remove events that are in the
|
||||
# table.
|
||||
existing_events = self.db_pool.simple_select_many_txn(
|
||||
txn,
|
||||
table="events",
|
||||
column="event_id",
|
||||
iterable=potential_backwards_extremities,
|
||||
iterable=potential_backwards_extremities.keys(),
|
||||
keyvalues={"outlier": False},
|
||||
retcols=("event_id",),
|
||||
)
|
||||
for (ev,) in existing_events:
|
||||
del potential_backwards_extremities[ev]
|
||||
|
||||
potential_backwards_extremities.difference_update(
|
||||
e for (e,) in existing_events_outliers
|
||||
)
|
||||
|
||||
if potential_backwards_extremities:
|
||||
self.db_pool.simple_upsert_many_txn(
|
||||
txn,
|
||||
table="event_backward_extremities",
|
||||
key_names=("room_id", "event_id"),
|
||||
key_values=[(room_id, ev) for ev in potential_backwards_extremities],
|
||||
value_names=(),
|
||||
value_values=(),
|
||||
for (
|
||||
backward_extremity,
|
||||
lowest_referring_ordering,
|
||||
) in potential_backwards_extremities.items():
|
||||
before_gap_event_id = self._find_before_gap_event_id(
|
||||
txn, room_id, backward_extremity, lowest_referring_ordering
|
||||
)
|
||||
|
||||
self.db_pool.simple_upsert_txn(
|
||||
txn,
|
||||
table="event_backward_extremities",
|
||||
keyvalues={
|
||||
"room_id": room_id,
|
||||
"event_id": backward_extremity,
|
||||
},
|
||||
values={"before_gap_event_id": before_gap_event_id},
|
||||
)
|
||||
|
||||
if potential_backwards_extremities:
|
||||
# Record the stream orderings where we have new gaps.
|
||||
gap_events = [
|
||||
(room_id, self._instance_name, ev.internal_metadata.stream_ordering)
|
||||
for ev in events
|
||||
for (ev, _) in events_and_contexts
|
||||
if any(
|
||||
e_id in potential_backwards_extremities
|
||||
for e_id in ev.prev_event_ids()
|
||||
@@ -3605,7 +3646,7 @@ class PersistEventsStore:
|
||||
)
|
||||
backward_extremity_tuples_to_remove = [
|
||||
(ev.event_id, ev.room_id)
|
||||
for ev in events
|
||||
for (ev, _) in events_and_contexts
|
||||
if not ev.internal_metadata.is_outlier()
|
||||
# If we encountered an event with no prev_events, then we might
|
||||
# as well remove it now because it won't ever have anything else
|
||||
@@ -3630,6 +3671,90 @@ class PersistEventsStore:
|
||||
backward_extremity_tuples_to_remove,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _find_before_gap_event_id(
|
||||
txn: LoggingTransaction,
|
||||
room_id: str,
|
||||
backward_extremity_event_id: str,
|
||||
lowest_referring_ordering: Optional[int],
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Figure out where in the stitched order a gap (or backwards extremity) belongs.
|
||||
|
||||
The result is in terms of the event that precedes the gap in the ordering.
|
||||
"None" means we were unable to find a preceding event, which should only
|
||||
happen if the create event was not assigned a stitched ordering.
|
||||
|
||||
We check if the backwards extremity already exists in the database, at an
|
||||
earlier ordering than that implied by `lowest_referring_ordering`, and if
|
||||
so return that location. Otherwise, we return the event before
|
||||
`lowest_referring_ordering`.
|
||||
|
||||
Args:
|
||||
txn
|
||||
room_id: ID of the room that the gap is in
|
||||
backward_extremity_event_id: Event ID of the backwards extremity (i.e.
|
||||
an event that is not in our database).
|
||||
lowest_referring_ordering: The lowest stitched ordering of all the events
|
||||
that we have just inserted, that refer to this backwards extremity.
|
||||
"""
|
||||
(new_before_gap_event_id, new_previous_stitched_ordering) = (None, None)
|
||||
|
||||
if lowest_referring_ordering is not None:
|
||||
# Given the lowest stitched ordering of all the events that we have just
|
||||
# inserted, find the previous event (by stitched ordering); the gap
|
||||
# will likely come just afterwards.
|
||||
#
|
||||
# Note: we include "AND event_id <> backward_extremity_event_id" because
|
||||
# if this backward extremity is actually an outlier, then that event
|
||||
# does exist in events, but we don't want to find it.
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT event_id, stitched_ordering FROM events
|
||||
WHERE room_id = ? AND stitched_ordering < ? AND event_id <> ?
|
||||
ORDER BY stitched_ordering DESC LIMIT 1
|
||||
""",
|
||||
[room_id, lowest_referring_ordering, backward_extremity_event_id],
|
||||
)
|
||||
row = txn.fetchone()
|
||||
if row is not None:
|
||||
(new_before_gap_event_id, new_previous_stitched_ordering) = row
|
||||
|
||||
# If this is an existing backwards extremity, see where it currently
|
||||
# exists in the order.
|
||||
txn.execute(
|
||||
"""
|
||||
SELECT events.event_id, events.stitched_ordering FROM
|
||||
event_backward_extremities LEFT JOIN events ON
|
||||
events.event_id = event_backward_extremities.before_gap_event_id
|
||||
WHERE event_backward_extremities.event_id = ?
|
||||
""",
|
||||
[backward_extremity_event_id],
|
||||
)
|
||||
row = txn.fetchone()
|
||||
|
||||
if row is None:
|
||||
# Not an existing backwards extremity: use our new before_gap_event_id
|
||||
return new_before_gap_event_id
|
||||
|
||||
(existing_before_gap_id, existing_previous_stitched_ordering) = row
|
||||
|
||||
# If the existing backwards extremity has not yet been assigned a
|
||||
# stream ordering, use our new before_gap_event_id.
|
||||
if existing_previous_stitched_ordering is None:
|
||||
return new_before_gap_event_id
|
||||
|
||||
# This is an existing backwards extremity with an assigned stitched ordering.
|
||||
# Leave it as-is unless we have successfully calculated a new stitched ordering
|
||||
# which is lower than the existing.
|
||||
if (
|
||||
new_previous_stitched_ordering is not None
|
||||
and new_previous_stitched_ordering < existing_previous_stitched_ordering
|
||||
):
|
||||
return new_before_gap_event_id
|
||||
else:
|
||||
return existing_previous_stitched_ordering
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class _LinkMap:
|
||||
|
||||
@@ -2755,3 +2755,21 @@ class EventsWorkerStore(SQLBaseStore):
|
||||
sender=row[0],
|
||||
received_ts=row[1],
|
||||
)
|
||||
|
||||
async def get_room_max_stitched_ordering(self, room_id: str) -> Optional[int]:
|
||||
"""Get the maximum stitched order for any event currently in the room.
|
||||
|
||||
If no events in this room have an assigned stitched order, returns None.
|
||||
"""
|
||||
|
||||
def get_room_max_stitched_ordering_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Optional[int]:
|
||||
sql = "SELECT MAX(stitched_ordering) FROM events WHERE room_id=?"
|
||||
txn.execute(sql, [room_id])
|
||||
ret = [r[0] for r in txn]
|
||||
return ret[0]
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_room_max_stitched_ordering", get_room_max_stitched_ordering_txn
|
||||
)
|
||||
|
||||
40
synapse/storage/schema/main/delta/92/10_stitched_order.sql
Normal file
40
synapse/storage/schema/main/delta/92/10_stitched_order.sql
Normal file
@@ -0,0 +1,40 @@
|
||||
--
|
||||
-- This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
--
|
||||
-- Copyright (C) 2025 New Vector, Ltd
|
||||
--
|
||||
-- This program is free software: you can redistribute it and/or modify
|
||||
-- it under the terms of the GNU Affero General Public License as
|
||||
-- published by the Free Software Foundation, either version 3 of the
|
||||
-- License, or (at your option) any later version.
|
||||
--
|
||||
-- See the GNU Affero General Public License for more details:
|
||||
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
ALTER TABLE events ADD COLUMN stitched_ordering BIGINT;
|
||||
CREATE UNIQUE INDEX events_stitched_order ON events(room_id, stitched_ordering); -- TODO make this concurrent
|
||||
|
||||
--CREATE TABLE stitched_ordering_gaps (
|
||||
-- room_id TEXT NOT NULL,
|
||||
-- following_event_id TEXT NOT NULL,
|
||||
-- missing_event_id TEXT NOT NULL,
|
||||
-- UNIQUE (room_id, following_event_id, missing_event_id)
|
||||
--);
|
||||
--CREATE INDEX stitched_ordering_gaps_missing_events ON stitched_ordering_gaps(room_id, missing_event_id);
|
||||
|
||||
-- Gaps in the stitched ordering are equivalent to a group of backward extremities that appear at
|
||||
-- the same point in the stitched ordering.
|
||||
--
|
||||
-- Rather than explicitly tracking where in the stitched ordering a given gap appears, we record the
|
||||
-- event id of the event that comes *before* the gap in the stitched ordering. Doing so means that:
|
||||
--
|
||||
-- 1. There is only one table that has a `stitched_ordering` column, making it easier to figure out
|
||||
-- how to insert a batch of events between existing events (and making the UNIQUE constraint effective).
|
||||
--
|
||||
-- 2. We don't need to allocate space in the stitched ordering for gaps; in particular we can assign an order
|
||||
-- to gaps *after* we have persisted the events. (We could probably work around this by double-spacing inserted
|
||||
-- events? but still, it's a nice property)
|
||||
--
|
||||
-- Note that this assumes that we never need to insert an event *before* a gap (or if we did,
|
||||
-- we'd have to update this table).
|
||||
ALTER TABLE event_backward_extremities ADD COLUMN before_gap_event_id TEXT REFERENCES events (event_id);
|
||||
0
tests/storage/controllers/__init__.py
Normal file
0
tests/storage/controllers/__init__.py
Normal file
91
tests/storage/controllers/test_persist_events.py
Normal file
91
tests/storage/controllers/test_persist_events.py
Normal file
@@ -0,0 +1,91 @@
|
||||
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
||||
#
|
||||
# Copyright (C) 2025 New Vector, Ltd
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as
|
||||
# published by the Free Software Foundation, either version 3 of the
|
||||
# License, or (at your option) any later version.
|
||||
#
|
||||
# See the GNU Affero General Public License for more details:
|
||||
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
||||
|
||||
from typing import List
|
||||
|
||||
import attr
|
||||
|
||||
from tests.unittest import TestCase
|
||||
|
||||
from synapse.storage.controllers.persist_events import find_predecessors
|
||||
|
||||
|
||||
class FindPredecessorsTestCase(TestCase):
|
||||
def test_predecessors_finds_nothing_if_event_is_not_in_batch(self) -> None:
|
||||
batch = [
|
||||
FakeEvent(event_id="B", prev_event_ids=["C"]),
|
||||
]
|
||||
|
||||
predecessors = find_predecessors({"A"}, batch) # type: ignore[arg-type]
|
||||
self.assertEqual(predecessors, set())
|
||||
|
||||
def test_predecessors_finds_only_event_if_it_has_no_predecessors(self) -> None:
|
||||
batch = [
|
||||
FakeEvent(event_id="E1", prev_event_ids=[]),
|
||||
FakeEvent(event_id="E2", prev_event_ids=["E3"]),
|
||||
]
|
||||
|
||||
predecessors = find_predecessors({"E1"}, batch) # type: ignore[arg-type]
|
||||
self.assertEqual(predecessors, {"E1"})
|
||||
|
||||
def test_predecessors_finds_all_ancestors(self) -> None:
|
||||
batch = [
|
||||
FakeEvent(event_id="A", prev_event_ids=["B", "C"]),
|
||||
FakeEvent(event_id="B", prev_event_ids=["D"]),
|
||||
FakeEvent(event_id="C", prev_event_ids=["D"]),
|
||||
FakeEvent(event_id="D", prev_event_ids=["E"]),
|
||||
FakeEvent(event_id="E", prev_event_ids=[]),
|
||||
FakeEvent(event_id="F", prev_event_ids=["G", "H"]),
|
||||
FakeEvent(event_id="G", prev_event_ids=[]),
|
||||
]
|
||||
predecessors = find_predecessors({"A"}, batch) # type: ignore[arg-type]
|
||||
self.assertEqual(predecessors, {"A", "B", "C", "D", "E"})
|
||||
|
||||
def test_predecessors_ignores_cycles(self) -> None:
|
||||
batch = [
|
||||
FakeEvent(event_id="E1", prev_event_ids=["E2"]),
|
||||
FakeEvent(event_id="E2", prev_event_ids=["E1"]),
|
||||
]
|
||||
|
||||
predecessors = find_predecessors({"E1"}, batch) # type: ignore[arg-type]
|
||||
self.assertEqual(predecessors, {"E1", "E2"})
|
||||
|
||||
def test_predecessors_ignores_self_reference_cycles(self) -> None:
|
||||
batch = [
|
||||
FakeEvent(event_id="E1", prev_event_ids=["E2"]),
|
||||
FakeEvent(event_id="E2", prev_event_ids=["E2"]),
|
||||
]
|
||||
|
||||
predecessors = find_predecessors({"E1"}, batch) # type: ignore[arg-type]
|
||||
self.assertEqual(predecessors, {"E1", "E2"})
|
||||
|
||||
def test_predecessors_finds_ancestors_of_multiple_starting_events(self) -> None:
|
||||
batch = [
|
||||
FakeEvent(event_id="A", prev_event_ids=["B"]),
|
||||
FakeEvent(event_id="B", prev_event_ids=[]),
|
||||
FakeEvent(event_id="C", prev_event_ids=["D"]),
|
||||
FakeEvent(event_id="D", prev_event_ids=["E"]),
|
||||
FakeEvent(event_id="E", prev_event_ids=[]),
|
||||
FakeEvent(event_id="F", prev_event_ids=["G"]),
|
||||
FakeEvent(event_id="G", prev_event_ids=[]),
|
||||
]
|
||||
predecessors = find_predecessors({"A", "C"}, batch) # type: ignore[arg-type]
|
||||
self.assertEqual(predecessors, {"A", "B", "C", "D", "E"})
|
||||
|
||||
|
||||
@attr.s(auto_attribs=True)
|
||||
class FakeEvent:
|
||||
event_id: str
|
||||
_prev_event_ids: List[str]
|
||||
|
||||
def prev_event_ids(self) -> List[str]:
|
||||
return self._prev_event_ids
|
||||
@@ -31,6 +31,7 @@ from synapse.federation.federation_base import event_from_pdu_json
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client import login, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.controllers.persist_events import assign_stitched_orders
|
||||
from synapse.types import StateMap
|
||||
from synapse.util import Clock
|
||||
|
||||
@@ -519,3 +520,47 @@ class InvalideUsersInRoomCacheTestCase(HomeserverTestCase):
|
||||
|
||||
users = self.get_success(self.store.get_users_in_room(room_id))
|
||||
self.assertEqual(users, [])
|
||||
|
||||
|
||||
class AssignStitchedOrderingTestCase(HomeserverTestCase):
|
||||
servlets = [
|
||||
admin.register_servlets,
|
||||
room.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(
|
||||
self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
|
||||
) -> None:
|
||||
self.state = self.hs.get_state_handler()
|
||||
# persistence = self.hs.get_storage_controllers().persistence
|
||||
# assert persistence is not None
|
||||
# self._persistence = persistence
|
||||
self.store = self.hs.get_datastores().main
|
||||
|
||||
def test_insert_events(self) -> None:
|
||||
# Create a room
|
||||
self.register_user("user", "pass")
|
||||
token = self.login("user", "pass")
|
||||
room_id = self.helper.create_room_as(
|
||||
"user", room_version=RoomVersions.V12.identifier, tok=token
|
||||
)
|
||||
|
||||
# Build a test event
|
||||
test_event = event_from_pdu_json(
|
||||
{
|
||||
"type": EventTypes.Message,
|
||||
"content": {"body": "blah"},
|
||||
"room_id": room_id,
|
||||
"sender": "@user:other",
|
||||
"depth": 5,
|
||||
"prev_events": [],
|
||||
"auth_events": [],
|
||||
"origin_server_ts": self.clock.time_msec(),
|
||||
},
|
||||
RoomVersions.V12,
|
||||
)
|
||||
|
||||
self.get_success(assign_stitched_orders(room_id, [test_event], self.store))
|
||||
|
||||
self.assertEqual(test_event.stitched_ordering, 6 * 2**16)
|
||||
|
||||
Reference in New Issue
Block a user