Fix event queue
This commit is contained in:
@@ -35,6 +35,7 @@ from typing import (
|
||||
)
|
||||
|
||||
import attr
|
||||
from mypy_extensions import DefaultNamedArg
|
||||
from prometheus_client import Counter, Histogram
|
||||
|
||||
from twisted.internet import defer
|
||||
@@ -112,7 +113,10 @@ times_pruned_extremities = Counter(
|
||||
@attr.s(auto_attribs=True, slots=True)
|
||||
class _EventPersistQueueItem:
|
||||
events_and_contexts: List[Tuple[EventBase, EventContext]]
|
||||
backfilled: bool
|
||||
should_calculate_state_and_forward_extrems: bool
|
||||
use_negative_stream_ordering: bool
|
||||
inhibit_local_membership_updates: bool
|
||||
update_room_forward_stream_ordering: bool
|
||||
deferred: ObservableDeferred
|
||||
|
||||
parent_opentracing_span_contexts: List = attr.ib(factory=list)
|
||||
@@ -133,7 +137,17 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
|
||||
def __init__(
|
||||
self,
|
||||
per_item_callback: Callable[
|
||||
[List[Tuple[EventBase, EventContext]], bool],
|
||||
[
|
||||
List[Tuple[EventBase, EventContext]],
|
||||
DefaultNamedArg(
|
||||
bool, "should_calculate_state_and_forward_extrems" # noqa: F821
|
||||
),
|
||||
DefaultNamedArg(bool, "use_negative_stream_ordering"), # noqa: F821
|
||||
DefaultNamedArg(bool, "inhibit_local_membership_updates"), # noqa: F821
|
||||
DefaultNamedArg(
|
||||
bool, "update_room_forward_stream_ordering" # noqa: F821
|
||||
),
|
||||
],
|
||||
Awaitable[_PersistResult],
|
||||
],
|
||||
):
|
||||
@@ -150,7 +164,11 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
|
||||
self,
|
||||
room_id: str,
|
||||
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
|
||||
backfilled: bool,
|
||||
*,
|
||||
should_calculate_state_and_forward_extrems: bool = True,
|
||||
use_negative_stream_ordering: bool = False,
|
||||
inhibit_local_membership_updates: bool = False,
|
||||
update_room_forward_stream_ordering: bool = True,
|
||||
) -> _PersistResult:
|
||||
"""Add events to the queue, with the given persist_event options.
|
||||
|
||||
@@ -160,7 +178,19 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
|
||||
Args:
|
||||
room_id (str):
|
||||
events_and_contexts (list[(EventBase, EventContext)]):
|
||||
backfilled (bool):
|
||||
should_calculate_state_and_forward_extrems: Determines whether we
|
||||
need to calculate the state and new forward extremities for the
|
||||
room. This should be set to false for backfilled events.
|
||||
use_negative_stream_ordering: Whether to start stream_ordering on
|
||||
the negative side and decrement. Usually this is done for any
|
||||
backfilled event.
|
||||
inhibit_local_membership_updates: Stop the local_current_membership
|
||||
from being updated by these events. Usually this is done for
|
||||
backfilled events.
|
||||
update_room_forward_stream_ordering: Whether to update the
|
||||
stream_ordering position to mark the latest event as the front
|
||||
of the room. This should only be set as false for backfilled
|
||||
events.
|
||||
|
||||
Returns:
|
||||
the result returned by the `_per_item_callback` passed to
|
||||
@@ -170,7 +200,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
|
||||
|
||||
# if the last item in the queue has the same `backfilled` setting,
|
||||
# we can just add these new events to that item.
|
||||
if queue and queue[-1].backfilled == backfilled:
|
||||
if (
|
||||
queue
|
||||
and queue[-1].use_negative_stream_ordering == use_negative_stream_ordering
|
||||
):
|
||||
end_item = queue[-1]
|
||||
else:
|
||||
# need to make a new queue item
|
||||
@@ -180,7 +213,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
|
||||
|
||||
end_item = _EventPersistQueueItem(
|
||||
events_and_contexts=[],
|
||||
backfilled=backfilled,
|
||||
should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
|
||||
use_negative_stream_ordering=use_negative_stream_ordering,
|
||||
inhibit_local_membership_updates=inhibit_local_membership_updates,
|
||||
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
|
||||
deferred=deferred,
|
||||
)
|
||||
queue.append(end_item)
|
||||
@@ -241,7 +277,11 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
|
||||
item.opentracing_span_context = scope.span.context
|
||||
|
||||
ret = await self._per_item_callback(
|
||||
item.events_and_contexts, item.backfilled
|
||||
item.events_and_contexts,
|
||||
should_calculate_state_and_forward_extrems=item.should_calculate_state_and_forward_extrems,
|
||||
use_negative_stream_ordering=item.use_negative_stream_ordering,
|
||||
inhibit_local_membership_updates=item.inhibit_local_membership_updates,
|
||||
update_room_forward_stream_ordering=item.update_room_forward_stream_ordering,
|
||||
)
|
||||
except Exception:
|
||||
with PreserveLoggingContext():
|
||||
|
||||
Reference in New Issue
Block a user