Compare commits

...

6 Commits

Author SHA1 Message Date
Eric Eastwood
f01d506fda Update comments docs to explain the why 2021-11-19 02:23:43 -06:00
Eric Eastwood
56ea69fecf Fix incompatible type
```
synapse/replication/http/federation.py:72: error: Signature of "_serialize_payload" incompatible with supertype "ReplicationEndpoint"  [override]
```
2021-11-19 01:51:05 -06:00
Eric Eastwood
ec82407b49 Add changelog 2021-11-19 00:43:32 -06:00
Eric Eastwood
d44fdcc8e6 Fix event queue 2021-11-19 00:39:02 -06:00
Eric Eastwood
b8c60b9ad2 Pipe arguments across the function stack 2021-11-18 20:09:38 -06:00
Eric Eastwood
d203d22a6f Refactor backfilled behavior into specific function parameters
Part of https://github.com/matrix-org/synapse/issues/11300
2021-11-18 19:37:39 -06:00
7 changed files with 376 additions and 57 deletions

1
changelog.d/11396.misc Normal file
View File

@@ -0,0 +1 @@
Refactor `backfilled` into specific behavior function arguments (`persist_events_and_notify` and downstream calls).

View File

@@ -1809,7 +1809,24 @@ class FederationEventHandler:
try:
await self.persist_events_and_notify(
event.room_id, [(event, context)], backfilled=backfilled
event.room_id,
[(event, context)],
# We should not send notifications about backfilled events.
inhibit_push_notifications=backfilled,
# We don't need to calculate the state for backfilled events and
# there is no need to update the forward extrems because we
# already know this event happened in the past if it was
# backfilled.
should_calculate_state_and_forward_extrems=not backfilled,
# Backfilled events get a negative stream ordering so they don't
# come down incremental `/sync`
use_negative_stream_ordering=backfilled,
# Backfilled events do not affect the current local state
inhibit_local_membership_updates=backfilled,
# Backfilled events have negative stream_ordering and happened
# in the past so we know that we don't need to update the
# stream_ordering tip for the room.
update_room_forward_stream_ordering=not backfilled,
)
except Exception:
run_in_background(
@@ -1821,7 +1838,12 @@ class FederationEventHandler:
self,
room_id: str,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
*,
inhibit_push_notifications: bool = False,
should_calculate_state_and_forward_extrems: bool = True,
use_negative_stream_ordering: bool = False,
inhibit_local_membership_updates: bool = False,
update_room_forward_stream_ordering: bool = True,
) -> int:
"""Persists events and tells the notifier/pushers about them, if
necessary.
@@ -1831,8 +1853,31 @@ class FederationEventHandler:
event_and_contexts: Sequence of events with their associated
context that should be persisted. All events must belong to
the same room.
backfilled: Whether these events are a result of
backfilling or not
inhibit_push_notifications: Whether to stop the notifiers/pushers
from knowing about the event. This should be set as True
for backfilled events because there is no need to send push
notifications for events in the past.
should_calculate_state_and_forward_extrems: Determines whether we
need to calculate the state and new forward extremities for the
room. This should be set to false for backfilled events because
we don't need to calculate the state for backfilled events and
there is no need to update the forward extrems because we
already know this event happened in the past if it was
backfilled.
use_negative_stream_ordering: Whether to start stream_ordering on
the negative side and decrement. This should be set as True
for backfilled events because backfilled events get a negative
stream ordering so they don't come down incremental `/sync`.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
update_room_forward_stream_ordering: Whether to update the
stream_ordering position to mark the latest event as the front
of the room. This should be set as False for backfilled
events because backfilled events have negative stream_ordering
and happened in the past so we know that we don't need to
update the stream_ordering tip for the room.
Returns:
The stream ID after which all events have been persisted.
@@ -1850,7 +1895,7 @@ class FederationEventHandler:
store=self._store,
room_id=room_id,
event_and_contexts=batch,
backfilled=backfilled,
inhibit_push_notifications=inhibit_push_notifications,
)
return result["max_stream_id"]
else:
@@ -1859,7 +1904,11 @@ class FederationEventHandler:
# Note that this returns the events that were persisted, which may not be
# the same as were passed in if some were deduplicated due to transaction IDs.
events, max_stream_token = await self._storage.persistence.persist_events(
event_and_contexts, backfilled=backfilled
event_and_contexts,
should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
use_negative_stream_ordering=use_negative_stream_ordering,
inhibit_local_membership_updates=inhibit_local_membership_updates,
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
)
if self._ephemeral_messages_enabled:
@@ -1867,7 +1916,7 @@ class FederationEventHandler:
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)
if not backfilled: # Never notify for backfilled events
if not inhibit_push_notifications: # Never notify for backfilled events
for event in events:
await self._notify_persisted_event(event, max_stream_token)

View File

@@ -1565,12 +1565,6 @@ class EventCreationHandler:
errcode=Codes.INVALID_PARAM,
)
# Mark any `m.historical` messages as backfilled so they don't appear
# in `/sync` and have the proper decrementing `stream_ordering` as we import
backfilled = False
if event.internal_metadata.is_historical():
backfilled = True
# Note that this returns the event that was persisted, which may not be
# the same as we passed in if it was deduplicated due transaction IDs.
(
@@ -1578,7 +1572,17 @@ class EventCreationHandler:
event_pos,
max_stream_token,
) = await self.storage.persistence.persist_event(
event, context=context, backfilled=backfilled
event,
context=context,
# Make any historical messages behave like backfilled events
should_calculate_state_and_forward_extrems=not event.internal_metadata.is_historical(),
# We use a negative `stream_ordering`` for historical messages so
# they don't come down an incremental `/sync` and have the proper
# decrementing `stream_ordering` as we import so they sort
# as expected between two depths.
use_negative_stream_ordering=event.internal_metadata.is_historical(),
inhibit_local_membership_updates=event.internal_metadata.is_historical(),
update_room_forward_stream_ordering=not event.internal_metadata.is_historical(),
)
if self._ephemeral_events_enabled:

View File

@@ -47,7 +47,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
}],
"backfilled": false
"inhibit_push_notifications": false
}
200 OK
@@ -69,14 +69,46 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
self.federation_event_handler = hs.get_federation_event_handler()
@staticmethod
async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
async def _serialize_payload(
store,
room_id,
event_and_contexts,
inhibit_push_notifications,
should_calculate_state_and_forward_extrems,
use_negative_stream_ordering,
inhibit_local_membership_updates,
update_room_forward_stream_ordering,
):
"""
Args:
store
room_id (str)
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
backfilled (bool): Whether or not the events are the result of
backfilling
inhibit_push_notifications: Whether to stop the notifiers/pushers
from knowing about the event. This should be set as True
for backfilled events because there is no need to send push
notifications for events in the past.
should_calculate_state_and_forward_extrems: Determines whether we
need to calculate the state and new forward extremities for the
room. This should be set to false for backfilled events because
we don't need to calculate the state for backfilled events and
there is no need to update the forward extrems because we
already know this event happened in the past if it was
backfilled.
use_negative_stream_ordering: Whether to start stream_ordering on
the negative side and decrement. This should be set as True
for backfilled events because backfilled events get a negative
stream ordering so they don't come down incremental `/sync`.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
update_room_forward_stream_ordering: Whether to update the
stream_ordering position to mark the latest event as the front
of the room. This should be set as False for backfilled
events because backfilled events have negative stream_ordering
and happened in the past so we know that we don't need to
update the stream_ordering tip for the room.
"""
event_payloads = []
for event, context in event_and_contexts:
@@ -96,7 +128,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
payload = {
"events": event_payloads,
"backfilled": backfilled,
"inhibit_push_notifications": inhibit_push_notifications,
"should_calculate_state_and_forward_extrems": should_calculate_state_and_forward_extrems,
"use_negative_stream_ordering": use_negative_stream_ordering,
"inhibit_local_membership_updates": inhibit_local_membership_updates,
"update_room_forward_stream_ordering": update_room_forward_stream_ordering,
"room_id": room_id,
}
@@ -107,7 +143,23 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
content = parse_json_object_from_request(request)
room_id = content["room_id"]
backfilled = content["backfilled"]
inhibit_push_notifications = content["inhibit_push_notifications"]
should_calculate_state_and_forward_extrems = content[
"should_calculate_state_and_forward_extrems"
]
use_negative_stream_ordering = content["use_negative_stream_ordering"]
inhibit_local_membership_updates = content[
"inhibit_local_membership_updates"
]
update_room_forward_stream_ordering = content[
"update_room_forward_stream_ordering"
]
assert inhibit_push_notifications is not None
assert should_calculate_state_and_forward_extrems is not None
assert use_negative_stream_ordering is not None
assert inhibit_local_membership_updates is not None
assert update_room_forward_stream_ordering is not None
event_payloads = content["events"]
@@ -132,7 +184,13 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
logger.info("Got %d events from federation", len(event_and_contexts))
max_stream_id = await self.federation_event_handler.persist_events_and_notify(
room_id, event_and_contexts, backfilled
room_id,
event_and_contexts,
inhibit_push_notifications=inhibit_push_notifications,
should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
use_negative_stream_ordering=use_negative_stream_ordering,
inhibit_local_membership_updates=inhibit_local_membership_updates,
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
)
return 200, {"max_stream_id": max_stream_id}

View File

@@ -121,10 +121,13 @@ class PersistEventsStore:
async def _persist_events_and_state_updates(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
*,
current_state_for_room: Dict[str, StateMap[str]],
state_delta_for_room: Dict[str, DeltaState],
new_forward_extremeties: Dict[str, List[str]],
backfilled: bool = False,
use_negative_stream_ordering: bool = False,
inhibit_local_membership_updates: bool = False,
update_room_forward_stream_ordering: bool = True,
) -> None:
"""Persist a set of events alongside updates to the current state and
forward extremities tables.
@@ -137,7 +140,20 @@ class PersistEventsStore:
room state
new_forward_extremities: Map from room_id to list of event IDs
that are the new forward extremities of the room.
backfilled
use_negative_stream_ordering: Whether to start stream_ordering on
the negative side and decrement. This should be set as True
for backfilled events because backfilled events get a negative
stream ordering so they don't come down incremental `/sync`.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
update_room_forward_stream_ordering: Whether to update the
stream_ordering position to mark the latest event as the front
of the room. This should be set as False for backfilled
events because backfilled events have negative stream_ordering
and happened in the past so we know that we don't need to
update the stream_ordering tip for the room.
Returns:
Resolves when the events have been persisted
@@ -159,7 +175,7 @@ class PersistEventsStore:
#
# Note: Multiple instances of this function cannot be in flight at
# the same time for the same room.
if backfilled:
if use_negative_stream_ordering:
stream_ordering_manager = self._backfill_id_gen.get_next_mult(
len(events_and_contexts)
)
@@ -176,13 +192,15 @@ class PersistEventsStore:
"persist_events",
self._persist_events_txn,
events_and_contexts=events_and_contexts,
backfilled=backfilled,
inhibit_local_membership_updates=inhibit_local_membership_updates,
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc(len(events_and_contexts))
if not backfilled:
# TODO: test that this actuall works
if stream < 0:
# backfilled events have negative stream orderings, so we don't
# want to set the event_persisted_position to that.
synapse.metrics.event_persisted_position.set(
@@ -316,8 +334,10 @@ class PersistEventsStore:
def _persist_events_txn(
self,
txn: LoggingTransaction,
*,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool,
inhibit_local_membership_updates: bool = False,
update_room_forward_stream_ordering: bool = True,
state_delta_for_room: Optional[Dict[str, DeltaState]] = None,
new_forward_extremeties: Optional[Dict[str, List[str]]] = None,
):
@@ -331,11 +351,21 @@ class PersistEventsStore:
txn
events_and_contexts: events to persist
backfilled: True if the events were backfilled
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
update_room_forward_stream_ordering: Whether to update the
stream_ordering position to mark the latest event as the front
of the room. This should be set as False for backfilled
events because backfilled events have negative stream_ordering
and happened in the past so we know that we don't need to
update the stream_ordering tip for the room.
delete_existing True to purge existing table rows for the events
from the database. This is useful when retrying due to
IntegrityError.
state_delta_for_room: The current-state delta for each room.
new_forward_extremetie: The new forward extremities for each room.
new_forward_extremeties: The new forward extremities for each room.
For each room, a list of the event ids which are the forward
extremities.
@@ -364,7 +394,9 @@ class PersistEventsStore:
)
self._update_room_depths_txn(
txn, events_and_contexts=events_and_contexts, backfilled=backfilled
txn,
events_and_contexts=events_and_contexts,
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
)
# _update_outliers_txn filters out any events which have already been
@@ -398,7 +430,7 @@ class PersistEventsStore:
txn,
events_and_contexts=events_and_contexts,
all_events_and_contexts=all_events_and_contexts,
backfilled=backfilled,
inhibit_local_membership_updates=inhibit_local_membership_updates,
)
# We call this last as it assumes we've inserted the events into
@@ -1200,7 +1232,8 @@ class PersistEventsStore:
self,
txn,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool,
*,
update_room_forward_stream_ordering: bool = True,
):
"""Update min_depth for each room
@@ -1208,13 +1241,18 @@ class PersistEventsStore:
txn (twisted.enterprise.adbapi.Connection): db connection
events_and_contexts (list[(EventBase, EventContext)]): events
we are persisting
backfilled (bool): True if the events were backfilled
update_room_forward_stream_ordering: Whether to update the
stream_ordering position to mark the latest event as the front
of the room. This should be set as False for backfilled
events because backfilled events have negative stream_ordering
and happened in the past so we know that we don't need to
update the stream_ordering tip for the room.
"""
depth_updates: Dict[str, int] = {}
for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids
txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
if not backfilled:
if update_room_forward_stream_ordering:
txn.call_after(
self.store._events_stream_cache.entity_has_changed,
event.room_id,
@@ -1427,7 +1465,12 @@ class PersistEventsStore:
return [ec for ec in events_and_contexts if ec[0] not in to_remove]
def _update_metadata_tables_txn(
self, txn, events_and_contexts, all_events_and_contexts, backfilled
self,
txn,
*,
events_and_contexts,
all_events_and_contexts,
inhibit_local_membership_updates: bool = False,
):
"""Update all the miscellaneous tables for new events
@@ -1439,7 +1482,10 @@ class PersistEventsStore:
events that we were going to persist. This includes events
we've already persisted, etc, that wouldn't appear in
events_and_context.
backfilled (bool): True if the events were backfilled
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
"""
# Insert all the push actions into the event_push_actions table.
@@ -1513,7 +1559,7 @@ class PersistEventsStore:
for event, _ in events_and_contexts
if event.type == EventTypes.Member
],
backfilled=backfilled,
inhibit_local_membership_updates=inhibit_local_membership_updates,
)
# Insert event_reference_hashes table.
@@ -1638,8 +1684,20 @@ class PersistEventsStore:
txn, table="event_reference_hashes", values=vals
)
def _store_room_members_txn(self, txn, events, backfilled):
"""Store a room member in the database."""
def _store_room_members_txn(
self, txn, events, *, inhibit_local_membership_updates: bool = False
):
"""
Store a room member in the database.
Args:
txn: The transaction to use.
events: List of events to store.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
"""
def non_null_str_or_none(val: Any) -> Optional[str]:
return val if isinstance(val, str) and "\u0000" not in val else None
@@ -1682,7 +1740,7 @@ class PersistEventsStore:
# band membership", like a remote invite or a rejection of a remote invite.
if (
self.is_mine_id(event.state_key)
and not backfilled
and not inhibit_local_membership_updates
and event.internal_metadata.is_outlier()
and event.internal_metadata.is_out_of_band_membership()
):

View File

@@ -35,6 +35,7 @@ from typing import (
)
import attr
from mypy_extensions import DefaultNamedArg
from prometheus_client import Counter, Histogram
from twisted.internet import defer
@@ -112,7 +113,10 @@ times_pruned_extremities = Counter(
@attr.s(auto_attribs=True, slots=True)
class _EventPersistQueueItem:
events_and_contexts: List[Tuple[EventBase, EventContext]]
backfilled: bool
should_calculate_state_and_forward_extrems: bool
use_negative_stream_ordering: bool
inhibit_local_membership_updates: bool
update_room_forward_stream_ordering: bool
deferred: ObservableDeferred
parent_opentracing_span_contexts: List = attr.ib(factory=list)
@@ -133,7 +137,17 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
def __init__(
self,
per_item_callback: Callable[
[List[Tuple[EventBase, EventContext]], bool],
[
List[Tuple[EventBase, EventContext]],
DefaultNamedArg(
bool, "should_calculate_state_and_forward_extrems" # noqa: F821
),
DefaultNamedArg(bool, "use_negative_stream_ordering"), # noqa: F821
DefaultNamedArg(bool, "inhibit_local_membership_updates"), # noqa: F821
DefaultNamedArg(
bool, "update_room_forward_stream_ordering" # noqa: F821
),
],
Awaitable[_PersistResult],
],
):
@@ -150,7 +164,11 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
self,
room_id: str,
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
backfilled: bool,
*,
should_calculate_state_and_forward_extrems: bool = True,
use_negative_stream_ordering: bool = False,
inhibit_local_membership_updates: bool = False,
update_room_forward_stream_ordering: bool = True,
) -> _PersistResult:
"""Add events to the queue, with the given persist_event options.
@@ -160,7 +178,27 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
Args:
room_id (str):
events_and_contexts (list[(EventBase, EventContext)]):
backfilled (bool):
should_calculate_state_and_forward_extrems: Determines whether we
need to calculate the state and new forward extremities for the
room. This should be set to false for backfilled events because
we don't need to calculate the state for backfilled events and
there is no need to update the forward extrems because we
already know this event happened in the past if it was
backfilled.
use_negative_stream_ordering: Whether to start stream_ordering on
the negative side and decrement. This should be set as True
for backfilled events because backfilled events get a negative
stream ordering so they don't come down incremental `/sync`.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
update_room_forward_stream_ordering: Whether to update the
stream_ordering position to mark the latest event as the front
of the room. This should be set as False for backfilled
events because backfilled events have negative stream_ordering
and happened in the past so we know that we don't need to
update the stream_ordering tip for the room.
Returns:
the result returned by the `_per_item_callback` passed to
@@ -170,7 +208,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
# if the last item in the queue has the same `backfilled` setting,
# we can just add these new events to that item.
if queue and queue[-1].backfilled == backfilled:
if (
queue
and queue[-1].use_negative_stream_ordering == use_negative_stream_ordering
):
end_item = queue[-1]
else:
# need to make a new queue item
@@ -180,7 +221,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
end_item = _EventPersistQueueItem(
events_and_contexts=[],
backfilled=backfilled,
should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
use_negative_stream_ordering=use_negative_stream_ordering,
inhibit_local_membership_updates=inhibit_local_membership_updates,
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
deferred=deferred,
)
queue.append(end_item)
@@ -241,7 +285,11 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
item.opentracing_span_context = scope.span.context
ret = await self._per_item_callback(
item.events_and_contexts, item.backfilled
item.events_and_contexts,
should_calculate_state_and_forward_extrems=item.should_calculate_state_and_forward_extrems,
use_negative_stream_ordering=item.use_negative_stream_ordering,
inhibit_local_membership_updates=item.inhibit_local_membership_updates,
update_room_forward_stream_ordering=item.update_room_forward_stream_ordering,
)
except Exception:
with PreserveLoggingContext():
@@ -296,15 +344,37 @@ class EventsPersistenceStorage:
async def persist_events(
self,
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
backfilled: bool = False,
*,
should_calculate_state_and_forward_extrems: bool = True,
use_negative_stream_ordering: bool = False,
inhibit_local_membership_updates: bool = False,
update_room_forward_stream_ordering: bool = True,
) -> Tuple[List[EventBase], RoomStreamToken]:
"""
Write events to the database
Args:
events_and_contexts: list of tuples of (event, context)
backfilled: Whether the results are retrieved from federation
via backfill or not. Used to determine if they're "new" events
which might update the current state etc.
should_calculate_state_and_forward_extrems: Determines whether we
need to calculate the state and new forward extremities for the
room. This should be set to false for backfilled events because
we don't need to calculate the state for backfilled events and
there is no need to update the forward extrems because we
already know this event happened in the past if it was
backfilled.
use_negative_stream_ordering: Whether to start stream_ordering on
the negative side and decrement. This should be set as True
for backfilled events because backfilled events get a negative
stream ordering so they don't come down incremental `/sync`.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
update_room_forward_stream_ordering: Whether to update the
stream_ordering position to mark the latest event as the front
of the room. This should be set as False for backfilled
events because backfilled events have negative stream_ordering
and happened in the past so we know that we don't need to
update the stream_ordering tip for the room.
Returns:
List of events persisted, the current position room stream position.
@@ -320,7 +390,12 @@ class EventsPersistenceStorage:
async def enqueue(item):
room_id, evs_ctxs = item
return await self._event_persist_queue.add_to_queue(
room_id, evs_ctxs, backfilled=backfilled
room_id,
evs_ctxs,
should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
use_negative_stream_ordering=use_negative_stream_ordering,
inhibit_local_membership_updates=inhibit_local_membership_updates,
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
)
ret_vals = await yieldable_gather_results(enqueue, partitioned.items())
@@ -350,9 +425,43 @@ class EventsPersistenceStorage:
@opentracing.trace
async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
self,
event: EventBase,
context: EventContext,
*,
should_calculate_state_and_forward_extrems: bool = True,
use_negative_stream_ordering: bool = False,
inhibit_local_membership_updates: bool = False,
update_room_forward_stream_ordering: bool = True,
) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]:
"""
Write a single event to the database.
Args:
event:
context:
should_calculate_state_and_forward_extrems: Determines whether we
need to calculate the state and new forward extremities for the
room. This should be set to false for backfilled events because
we don't need to calculate the state for backfilled events and
there is no need to update the forward extrems because we
already know this event happened in the past if it was
backfilled.
use_negative_stream_ordering: Whether to start stream_ordering on
the negative side and decrement. This should be set as True
for backfilled events because backfilled events get a negative
stream ordering so they don't come down incremental `/sync`.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
update_room_forward_stream_ordering: Whether to update the
stream_ordering position to mark the latest event as the front
of the room. This should be set as False for backfilled
events because backfilled events have negative stream_ordering
and happened in the past so we know that we don't need to
update the stream_ordering tip for the room.
Returns:
The event, stream ordering of `event`, and the stream ordering of the
latest persisted event. The returned event may not match the given
@@ -363,7 +472,12 @@ class EventsPersistenceStorage:
# event was deduplicated. (The dict may also include other entries if
# the event was persisted in a batch with other events.)
replaced_events = await self._event_persist_queue.add_to_queue(
event.room_id, [(event, context)], backfilled=backfilled
event.room_id,
[(event, context)],
should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
use_negative_stream_ordering=use_negative_stream_ordering,
inhibit_local_membership_updates=inhibit_local_membership_updates,
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
)
replaced_event = replaced_events.get(event.event_id)
if replaced_event:
@@ -379,13 +493,41 @@ class EventsPersistenceStorage:
async def _persist_event_batch(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
backfilled: bool = False,
*,
should_calculate_state_and_forward_extrems: bool = True,
use_negative_stream_ordering: bool = False,
inhibit_local_membership_updates: bool = False,
update_room_forward_stream_ordering: bool = True,
) -> Dict[str, str]:
"""Callback for the _event_persist_queue
Calculates the change to current state and forward extremities, and
persists the given events and with those updates.
Args:
events_and_contexts:
should_calculate_state_and_forward_extrems: Determines whether we
need to calculate the state and new forward extremities for the
room. This should be set to false for backfilled events because
we don't need to calculate the state for backfilled events and
there is no need to update the forward extrems because we
already know this event happened in the past if it was
backfilled.
use_negative_stream_ordering: Whether to start stream_ordering on
the negative side and decrement. This should be set as True
for backfilled events because backfilled events get a negative
stream ordering so they don't come down incremental `/sync`.
inhibit_local_membership_updates: Stop the local_current_membership
from being updated by these events. This should be set to True
for backfilled events because backfilled events in the past do
not affect the current local state.
update_room_forward_stream_ordering: Whether to update the
stream_ordering position to mark the latest event as the front
of the room. This should be set as False for backfilled
events because backfilled events have negative stream_ordering
and happened in the past so we know that we don't need to
update the stream_ordering tip for the room.
Returns:
A dictionary of event ID to event ID we didn't persist as we already
had another event persisted with the same TXN ID.
@@ -448,7 +590,7 @@ class EventsPersistenceStorage:
# device lists as stale.
potentially_left_users: Set[str] = set()
if not backfilled:
if should_calculate_state_and_forward_extrems:
with Measure(self._clock, "_calculate_state_and_extrem"):
# Work out the new "current state" for each room.
# We do this by working out what the new extremities are and then
@@ -583,7 +725,9 @@ class EventsPersistenceStorage:
current_state_for_room=current_state_for_room,
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
backfilled=backfilled,
use_negative_stream_ordering=use_negative_stream_ordering,
inhibit_local_membership_updates=inhibit_local_membership_updates,
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
)
await self._handle_potentially_left_users(potentially_left_users)

View File

@@ -323,7 +323,12 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
if backfill:
self.get_success(
self.storage.persistence.persist_events(
[(event, context)], backfilled=True
[(event, context)],
# Backfilled event
should_calculate_state_and_forward_extrems=False,
use_negative_stream_ordering=True,
inhibit_local_membership_updates=True,
update_room_forward_stream_ordering=False,
)
)
else: