diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 101b469bb4..18b991d607 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -35,6 +35,7 @@ from typing import ( ) import attr +from mypy_extensions import DefaultNamedArg from prometheus_client import Counter, Histogram from twisted.internet import defer @@ -112,7 +113,10 @@ times_pruned_extremities = Counter( @attr.s(auto_attribs=True, slots=True) class _EventPersistQueueItem: events_and_contexts: List[Tuple[EventBase, EventContext]] - backfilled: bool + should_calculate_state_and_forward_extrems: bool + use_negative_stream_ordering: bool + inhibit_local_membership_updates: bool + update_room_forward_stream_ordering: bool deferred: ObservableDeferred parent_opentracing_span_contexts: List = attr.ib(factory=list) @@ -133,7 +137,17 @@ class _EventPeristenceQueue(Generic[_PersistResult]): def __init__( self, per_item_callback: Callable[ - [List[Tuple[EventBase, EventContext]], bool], + [ + List[Tuple[EventBase, EventContext]], + DefaultNamedArg( + bool, "should_calculate_state_and_forward_extrems" # noqa: F821 + ), + DefaultNamedArg(bool, "use_negative_stream_ordering"), # noqa: F821 + DefaultNamedArg(bool, "inhibit_local_membership_updates"), # noqa: F821 + DefaultNamedArg( + bool, "update_room_forward_stream_ordering" # noqa: F821 + ), + ], Awaitable[_PersistResult], ], ): @@ -150,7 +164,11 @@ class _EventPeristenceQueue(Generic[_PersistResult]): self, room_id: str, events_and_contexts: Iterable[Tuple[EventBase, EventContext]], - backfilled: bool, + *, + should_calculate_state_and_forward_extrems: bool = True, + use_negative_stream_ordering: bool = False, + inhibit_local_membership_updates: bool = False, + update_room_forward_stream_ordering: bool = True, ) -> _PersistResult: """Add events to the queue, with the given persist_event options. @@ -160,7 +178,19 @@ class _EventPeristenceQueue(Generic[_PersistResult]): Args: room_id (str): events_and_contexts (list[(EventBase, EventContext)]): - backfilled (bool): + should_calculate_state_and_forward_extrems: Determines whether we + need to calculate the state and new forward extremities for the + room. This should be set to false for backfilled events. + use_negative_stream_ordering: Whether to start stream_ordering on + the negative side and decrement. Usually this is done for any + backfilled event. + inhibit_local_membership_updates: Stop the local_current_membership + from being updated by these events. Usually this is done for + backfilled events. + update_room_forward_stream_ordering: Whether to update the + stream_ordering position to mark the latest event as the front + of the room. This should only be set as false for backfilled + events. Returns: the result returned by the `_per_item_callback` passed to @@ -170,7 +200,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]): # if the last item in the queue has the same `backfilled` setting, # we can just add these new events to that item. - if queue and queue[-1].backfilled == backfilled: + if ( + queue + and queue[-1].use_negative_stream_ordering == use_negative_stream_ordering + ): end_item = queue[-1] else: # need to make a new queue item @@ -180,7 +213,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]): end_item = _EventPersistQueueItem( events_and_contexts=[], - backfilled=backfilled, + should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=use_negative_stream_ordering, + inhibit_local_membership_updates=inhibit_local_membership_updates, + update_room_forward_stream_ordering=update_room_forward_stream_ordering, deferred=deferred, ) queue.append(end_item) @@ -241,7 +277,11 @@ class _EventPeristenceQueue(Generic[_PersistResult]): item.opentracing_span_context = scope.span.context ret = await self._per_item_callback( - item.events_and_contexts, item.backfilled + item.events_and_contexts, + should_calculate_state_and_forward_extrems=item.should_calculate_state_and_forward_extrems, + use_negative_stream_ordering=item.use_negative_stream_ordering, + inhibit_local_membership_updates=item.inhibit_local_membership_updates, + update_room_forward_stream_ordering=item.update_room_forward_stream_ordering, ) except Exception: with PreserveLoggingContext():