Compare commits
6 Commits
v1.140.0rc
...
madlittlem
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f01d506fda | ||
|
|
56ea69fecf | ||
|
|
ec82407b49 | ||
|
|
d44fdcc8e6 | ||
|
|
b8c60b9ad2 | ||
|
|
d203d22a6f |
1
changelog.d/11396.misc
Normal file
1
changelog.d/11396.misc
Normal file
@@ -0,0 +1 @@
|
||||
Refactor `backfilled` into specific behavior function arguments (`persist_events_and_notify` and downstream calls).
|
||||
@@ -1809,7 +1809,24 @@ class FederationEventHandler:
|
||||
|
||||
try:
|
||||
await self.persist_events_and_notify(
|
||||
event.room_id, [(event, context)], backfilled=backfilled
|
||||
event.room_id,
|
||||
[(event, context)],
|
||||
# We should not send notifications about backfilled events.
|
||||
inhibit_push_notifications=backfilled,
|
||||
# We don't need to calculate the state for backfilled events and
|
||||
# there is no need to update the forward extrems because we
|
||||
# already know this event happened in the past if it was
|
||||
# backfilled.
|
||||
should_calculate_state_and_forward_extrems=not backfilled,
|
||||
# Backfilled events get a negative stream ordering so they don't
|
||||
# come down incremental `/sync`
|
||||
use_negative_stream_ordering=backfilled,
|
||||
# Backfilled events do not affect the current local state
|
||||
inhibit_local_membership_updates=backfilled,
|
||||
# Backfilled events have negative stream_ordering and happened
|
||||
# in the past so we know that we don't need to update the
|
||||
# stream_ordering tip for the room.
|
||||
update_room_forward_stream_ordering=not backfilled,
|
||||
)
|
||||
except Exception:
|
||||
run_in_background(
|
||||
@@ -1821,7 +1838,12 @@ class FederationEventHandler:
|
||||
self,
|
||||
room_id: str,
|
||||
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
|
||||
backfilled: bool = False,
|
||||
*,
|
||||
inhibit_push_notifications: bool = False,
|
||||
should_calculate_state_and_forward_extrems: bool = True,
|
||||
use_negative_stream_ordering: bool = False,
|
||||
inhibit_local_membership_updates: bool = False,
|
||||
update_room_forward_stream_ordering: bool = True,
|
||||
) -> int:
|
||||
"""Persists events and tells the notifier/pushers about them, if
|
||||
necessary.
|
||||
@@ -1831,8 +1853,31 @@ class FederationEventHandler:
|
||||
event_and_contexts: Sequence of events with their associated
|
||||
context that should be persisted. All events must belong to
|
||||
the same room.
|
||||
backfilled: Whether these events are a result of
|
||||
backfilling or not
|
||||
inhibit_push_notifications: Whether to stop the notifiers/pushers
|
||||
from knowing about the event. This should be set as True
|
||||
for backfilled events because there is no need to send push
|
||||
notifications for events in the past.
|
||||
should_calculate_state_and_forward_extrems: Determines whether we
|
||||
need to calculate the state and new forward extremities for the
|
||||
room. This should be set to false for backfilled events because
|
||||
we don't need to calculate the state for backfilled events and
|
||||
there is no need to update the forward extrems because we
|
||||
already know this event happened in the past if it was
|
||||
backfilled.
|
||||
use_negative_stream_ordering: Whether to start stream_ordering on
|
||||
the negative side and decrement. This should be set as True
|
||||
for backfilled events because backfilled events get a negative
|
||||
stream ordering so they don't come down incremental `/sync`.
|
||||
inhibit_local_membership_updates: Stop the local_current_membership
|
||||
from being updated by these events. This should be set to True
|
||||
for backfilled events because backfilled events in the past do
|
||||
not affect the current local state.
|
||||
update_room_forward_stream_ordering: Whether to update the
|
||||
stream_ordering position to mark the latest event as the front
|
||||
of the room. This should be set as False for backfilled
|
||||
events because backfilled events have negative stream_ordering
|
||||
and happened in the past so we know that we don't need to
|
||||
update the stream_ordering tip for the room.
|
||||
|
||||
Returns:
|
||||
The stream ID after which all events have been persisted.
|
||||
@@ -1850,7 +1895,7 @@ class FederationEventHandler:
|
||||
store=self._store,
|
||||
room_id=room_id,
|
||||
event_and_contexts=batch,
|
||||
backfilled=backfilled,
|
||||
inhibit_push_notifications=inhibit_push_notifications,
|
||||
)
|
||||
return result["max_stream_id"]
|
||||
else:
|
||||
@@ -1859,7 +1904,11 @@ class FederationEventHandler:
|
||||
# Note that this returns the events that were persisted, which may not be
|
||||
# the same as were passed in if some were deduplicated due to transaction IDs.
|
||||
events, max_stream_token = await self._storage.persistence.persist_events(
|
||||
event_and_contexts, backfilled=backfilled
|
||||
event_and_contexts,
|
||||
should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
|
||||
use_negative_stream_ordering=use_negative_stream_ordering,
|
||||
inhibit_local_membership_updates=inhibit_local_membership_updates,
|
||||
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
|
||||
)
|
||||
|
||||
if self._ephemeral_messages_enabled:
|
||||
@@ -1867,7 +1916,7 @@ class FederationEventHandler:
|
||||
# If there's an expiry timestamp on the event, schedule its expiry.
|
||||
self._message_handler.maybe_schedule_expiry(event)
|
||||
|
||||
if not backfilled: # Never notify for backfilled events
|
||||
if not inhibit_push_notifications: # Never notify for backfilled events
|
||||
for event in events:
|
||||
await self._notify_persisted_event(event, max_stream_token)
|
||||
|
||||
|
||||
@@ -1565,12 +1565,6 @@ class EventCreationHandler:
|
||||
errcode=Codes.INVALID_PARAM,
|
||||
)
|
||||
|
||||
# Mark any `m.historical` messages as backfilled so they don't appear
|
||||
# in `/sync` and have the proper decrementing `stream_ordering` as we import
|
||||
backfilled = False
|
||||
if event.internal_metadata.is_historical():
|
||||
backfilled = True
|
||||
|
||||
# Note that this returns the event that was persisted, which may not be
|
||||
# the same as we passed in if it was deduplicated due transaction IDs.
|
||||
(
|
||||
@@ -1578,7 +1572,17 @@ class EventCreationHandler:
|
||||
event_pos,
|
||||
max_stream_token,
|
||||
) = await self.storage.persistence.persist_event(
|
||||
event, context=context, backfilled=backfilled
|
||||
event,
|
||||
context=context,
|
||||
# Make any historical messages behave like backfilled events
|
||||
should_calculate_state_and_forward_extrems=not event.internal_metadata.is_historical(),
|
||||
# We use a negative `stream_ordering`` for historical messages so
|
||||
# they don't come down an incremental `/sync` and have the proper
|
||||
# decrementing `stream_ordering` as we import so they sort
|
||||
# as expected between two depths.
|
||||
use_negative_stream_ordering=event.internal_metadata.is_historical(),
|
||||
inhibit_local_membership_updates=event.internal_metadata.is_historical(),
|
||||
update_room_forward_stream_ordering=not event.internal_metadata.is_historical(),
|
||||
)
|
||||
|
||||
if self._ephemeral_events_enabled:
|
||||
|
||||
@@ -47,7 +47,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
"rejected_reason": .., // The event.rejected_reason field
|
||||
"context": { .. serialized event context .. },
|
||||
}],
|
||||
"backfilled": false
|
||||
"inhibit_push_notifications": false
|
||||
}
|
||||
|
||||
200 OK
|
||||
@@ -69,14 +69,46 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
self.federation_event_handler = hs.get_federation_event_handler()
|
||||
|
||||
@staticmethod
|
||||
async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
|
||||
async def _serialize_payload(
|
||||
store,
|
||||
room_id,
|
||||
event_and_contexts,
|
||||
inhibit_push_notifications,
|
||||
should_calculate_state_and_forward_extrems,
|
||||
use_negative_stream_ordering,
|
||||
inhibit_local_membership_updates,
|
||||
update_room_forward_stream_ordering,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
store
|
||||
room_id (str)
|
||||
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
|
||||
backfilled (bool): Whether or not the events are the result of
|
||||
backfilling
|
||||
inhibit_push_notifications: Whether to stop the notifiers/pushers
|
||||
from knowing about the event. This should be set as True
|
||||
for backfilled events because there is no need to send push
|
||||
notifications for events in the past.
|
||||
should_calculate_state_and_forward_extrems: Determines whether we
|
||||
need to calculate the state and new forward extremities for the
|
||||
room. This should be set to false for backfilled events because
|
||||
we don't need to calculate the state for backfilled events and
|
||||
there is no need to update the forward extrems because we
|
||||
already know this event happened in the past if it was
|
||||
backfilled.
|
||||
use_negative_stream_ordering: Whether to start stream_ordering on
|
||||
the negative side and decrement. This should be set as True
|
||||
for backfilled events because backfilled events get a negative
|
||||
stream ordering so they don't come down incremental `/sync`.
|
||||
inhibit_local_membership_updates: Stop the local_current_membership
|
||||
from being updated by these events. This should be set to True
|
||||
for backfilled events because backfilled events in the past do
|
||||
not affect the current local state.
|
||||
update_room_forward_stream_ordering: Whether to update the
|
||||
stream_ordering position to mark the latest event as the front
|
||||
of the room. This should be set as False for backfilled
|
||||
events because backfilled events have negative stream_ordering
|
||||
and happened in the past so we know that we don't need to
|
||||
update the stream_ordering tip for the room.
|
||||
"""
|
||||
event_payloads = []
|
||||
for event, context in event_and_contexts:
|
||||
@@ -96,7 +128,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
|
||||
payload = {
|
||||
"events": event_payloads,
|
||||
"backfilled": backfilled,
|
||||
"inhibit_push_notifications": inhibit_push_notifications,
|
||||
"should_calculate_state_and_forward_extrems": should_calculate_state_and_forward_extrems,
|
||||
"use_negative_stream_ordering": use_negative_stream_ordering,
|
||||
"inhibit_local_membership_updates": inhibit_local_membership_updates,
|
||||
"update_room_forward_stream_ordering": update_room_forward_stream_ordering,
|
||||
"room_id": room_id,
|
||||
}
|
||||
|
||||
@@ -107,7 +143,23 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
room_id = content["room_id"]
|
||||
backfilled = content["backfilled"]
|
||||
inhibit_push_notifications = content["inhibit_push_notifications"]
|
||||
should_calculate_state_and_forward_extrems = content[
|
||||
"should_calculate_state_and_forward_extrems"
|
||||
]
|
||||
use_negative_stream_ordering = content["use_negative_stream_ordering"]
|
||||
inhibit_local_membership_updates = content[
|
||||
"inhibit_local_membership_updates"
|
||||
]
|
||||
update_room_forward_stream_ordering = content[
|
||||
"update_room_forward_stream_ordering"
|
||||
]
|
||||
|
||||
assert inhibit_push_notifications is not None
|
||||
assert should_calculate_state_and_forward_extrems is not None
|
||||
assert use_negative_stream_ordering is not None
|
||||
assert inhibit_local_membership_updates is not None
|
||||
assert update_room_forward_stream_ordering is not None
|
||||
|
||||
event_payloads = content["events"]
|
||||
|
||||
@@ -132,7 +184,13 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
logger.info("Got %d events from federation", len(event_and_contexts))
|
||||
|
||||
max_stream_id = await self.federation_event_handler.persist_events_and_notify(
|
||||
room_id, event_and_contexts, backfilled
|
||||
room_id,
|
||||
event_and_contexts,
|
||||
inhibit_push_notifications=inhibit_push_notifications,
|
||||
should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
|
||||
use_negative_stream_ordering=use_negative_stream_ordering,
|
||||
inhibit_local_membership_updates=inhibit_local_membership_updates,
|
||||
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
|
||||
)
|
||||
|
||||
return 200, {"max_stream_id": max_stream_id}
|
||||
|
||||
@@ -121,10 +121,13 @@ class PersistEventsStore:
|
||||
async def _persist_events_and_state_updates(
|
||||
self,
|
||||
events_and_contexts: List[Tuple[EventBase, EventContext]],
|
||||
*,
|
||||
current_state_for_room: Dict[str, StateMap[str]],
|
||||
state_delta_for_room: Dict[str, DeltaState],
|
||||
new_forward_extremeties: Dict[str, List[str]],
|
||||
backfilled: bool = False,
|
||||
use_negative_stream_ordering: bool = False,
|
||||
inhibit_local_membership_updates: bool = False,
|
||||
update_room_forward_stream_ordering: bool = True,
|
||||
) -> None:
|
||||
"""Persist a set of events alongside updates to the current state and
|
||||
forward extremities tables.
|
||||
@@ -137,7 +140,20 @@ class PersistEventsStore:
|
||||
room state
|
||||
new_forward_extremities: Map from room_id to list of event IDs
|
||||
that are the new forward extremities of the room.
|
||||
backfilled
|
||||
use_negative_stream_ordering: Whether to start stream_ordering on
|
||||
the negative side and decrement. This should be set as True
|
||||
for backfilled events because backfilled events get a negative
|
||||
stream ordering so they don't come down incremental `/sync`.
|
||||
inhibit_local_membership_updates: Stop the local_current_membership
|
||||
from being updated by these events. This should be set to True
|
||||
for backfilled events because backfilled events in the past do
|
||||
not affect the current local state.
|
||||
update_room_forward_stream_ordering: Whether to update the
|
||||
stream_ordering position to mark the latest event as the front
|
||||
of the room. This should be set as False for backfilled
|
||||
events because backfilled events have negative stream_ordering
|
||||
and happened in the past so we know that we don't need to
|
||||
update the stream_ordering tip for the room.
|
||||
|
||||
Returns:
|
||||
Resolves when the events have been persisted
|
||||
@@ -159,7 +175,7 @@ class PersistEventsStore:
|
||||
#
|
||||
# Note: Multiple instances of this function cannot be in flight at
|
||||
# the same time for the same room.
|
||||
if backfilled:
|
||||
if use_negative_stream_ordering:
|
||||
stream_ordering_manager = self._backfill_id_gen.get_next_mult(
|
||||
len(events_and_contexts)
|
||||
)
|
||||
@@ -176,13 +192,15 @@ class PersistEventsStore:
|
||||
"persist_events",
|
||||
self._persist_events_txn,
|
||||
events_and_contexts=events_and_contexts,
|
||||
backfilled=backfilled,
|
||||
inhibit_local_membership_updates=inhibit_local_membership_updates,
|
||||
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
|
||||
state_delta_for_room=state_delta_for_room,
|
||||
new_forward_extremeties=new_forward_extremeties,
|
||||
)
|
||||
persist_event_counter.inc(len(events_and_contexts))
|
||||
|
||||
if not backfilled:
|
||||
# TODO: test that this actuall works
|
||||
if stream < 0:
|
||||
# backfilled events have negative stream orderings, so we don't
|
||||
# want to set the event_persisted_position to that.
|
||||
synapse.metrics.event_persisted_position.set(
|
||||
@@ -316,8 +334,10 @@ class PersistEventsStore:
|
||||
def _persist_events_txn(
|
||||
self,
|
||||
txn: LoggingTransaction,
|
||||
*,
|
||||
events_and_contexts: List[Tuple[EventBase, EventContext]],
|
||||
backfilled: bool,
|
||||
inhibit_local_membership_updates: bool = False,
|
||||
update_room_forward_stream_ordering: bool = True,
|
||||
state_delta_for_room: Optional[Dict[str, DeltaState]] = None,
|
||||
new_forward_extremeties: Optional[Dict[str, List[str]]] = None,
|
||||
):
|
||||
@@ -331,11 +351,21 @@ class PersistEventsStore:
|
||||
txn
|
||||
events_and_contexts: events to persist
|
||||
backfilled: True if the events were backfilled
|
||||
inhibit_local_membership_updates: Stop the local_current_membership
|
||||
from being updated by these events. This should be set to True
|
||||
for backfilled events because backfilled events in the past do
|
||||
not affect the current local state.
|
||||
update_room_forward_stream_ordering: Whether to update the
|
||||
stream_ordering position to mark the latest event as the front
|
||||
of the room. This should be set as False for backfilled
|
||||
events because backfilled events have negative stream_ordering
|
||||
and happened in the past so we know that we don't need to
|
||||
update the stream_ordering tip for the room.
|
||||
delete_existing True to purge existing table rows for the events
|
||||
from the database. This is useful when retrying due to
|
||||
IntegrityError.
|
||||
state_delta_for_room: The current-state delta for each room.
|
||||
new_forward_extremetie: The new forward extremities for each room.
|
||||
new_forward_extremeties: The new forward extremities for each room.
|
||||
For each room, a list of the event ids which are the forward
|
||||
extremities.
|
||||
|
||||
@@ -364,7 +394,9 @@ class PersistEventsStore:
|
||||
)
|
||||
|
||||
self._update_room_depths_txn(
|
||||
txn, events_and_contexts=events_and_contexts, backfilled=backfilled
|
||||
txn,
|
||||
events_and_contexts=events_and_contexts,
|
||||
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
|
||||
)
|
||||
|
||||
# _update_outliers_txn filters out any events which have already been
|
||||
@@ -398,7 +430,7 @@ class PersistEventsStore:
|
||||
txn,
|
||||
events_and_contexts=events_and_contexts,
|
||||
all_events_and_contexts=all_events_and_contexts,
|
||||
backfilled=backfilled,
|
||||
inhibit_local_membership_updates=inhibit_local_membership_updates,
|
||||
)
|
||||
|
||||
# We call this last as it assumes we've inserted the events into
|
||||
@@ -1200,7 +1232,8 @@ class PersistEventsStore:
|
||||
self,
|
||||
txn,
|
||||
events_and_contexts: List[Tuple[EventBase, EventContext]],
|
||||
backfilled: bool,
|
||||
*,
|
||||
update_room_forward_stream_ordering: bool = True,
|
||||
):
|
||||
"""Update min_depth for each room
|
||||
|
||||
@@ -1208,13 +1241,18 @@ class PersistEventsStore:
|
||||
txn (twisted.enterprise.adbapi.Connection): db connection
|
||||
events_and_contexts (list[(EventBase, EventContext)]): events
|
||||
we are persisting
|
||||
backfilled (bool): True if the events were backfilled
|
||||
update_room_forward_stream_ordering: Whether to update the
|
||||
stream_ordering position to mark the latest event as the front
|
||||
of the room. This should be set as False for backfilled
|
||||
events because backfilled events have negative stream_ordering
|
||||
and happened in the past so we know that we don't need to
|
||||
update the stream_ordering tip for the room.
|
||||
"""
|
||||
depth_updates: Dict[str, int] = {}
|
||||
for event, context in events_and_contexts:
|
||||
# Remove the any existing cache entries for the event_ids
|
||||
txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
|
||||
if not backfilled:
|
||||
if update_room_forward_stream_ordering:
|
||||
txn.call_after(
|
||||
self.store._events_stream_cache.entity_has_changed,
|
||||
event.room_id,
|
||||
@@ -1427,7 +1465,12 @@ class PersistEventsStore:
|
||||
return [ec for ec in events_and_contexts if ec[0] not in to_remove]
|
||||
|
||||
def _update_metadata_tables_txn(
|
||||
self, txn, events_and_contexts, all_events_and_contexts, backfilled
|
||||
self,
|
||||
txn,
|
||||
*,
|
||||
events_and_contexts,
|
||||
all_events_and_contexts,
|
||||
inhibit_local_membership_updates: bool = False,
|
||||
):
|
||||
"""Update all the miscellaneous tables for new events
|
||||
|
||||
@@ -1439,7 +1482,10 @@ class PersistEventsStore:
|
||||
events that we were going to persist. This includes events
|
||||
we've already persisted, etc, that wouldn't appear in
|
||||
events_and_context.
|
||||
backfilled (bool): True if the events were backfilled
|
||||
inhibit_local_membership_updates: Stop the local_current_membership
|
||||
from being updated by these events. This should be set to True
|
||||
for backfilled events because backfilled events in the past do
|
||||
not affect the current local state.
|
||||
"""
|
||||
|
||||
# Insert all the push actions into the event_push_actions table.
|
||||
@@ -1513,7 +1559,7 @@ class PersistEventsStore:
|
||||
for event, _ in events_and_contexts
|
||||
if event.type == EventTypes.Member
|
||||
],
|
||||
backfilled=backfilled,
|
||||
inhibit_local_membership_updates=inhibit_local_membership_updates,
|
||||
)
|
||||
|
||||
# Insert event_reference_hashes table.
|
||||
@@ -1638,8 +1684,20 @@ class PersistEventsStore:
|
||||
txn, table="event_reference_hashes", values=vals
|
||||
)
|
||||
|
||||
def _store_room_members_txn(self, txn, events, backfilled):
|
||||
"""Store a room member in the database."""
|
||||
def _store_room_members_txn(
|
||||
self, txn, events, *, inhibit_local_membership_updates: bool = False
|
||||
):
|
||||
"""
|
||||
Store a room member in the database.
|
||||
|
||||
Args:
|
||||
txn: The transaction to use.
|
||||
events: List of events to store.
|
||||
inhibit_local_membership_updates: Stop the local_current_membership
|
||||
from being updated by these events. This should be set to True
|
||||
for backfilled events because backfilled events in the past do
|
||||
not affect the current local state.
|
||||
"""
|
||||
|
||||
def non_null_str_or_none(val: Any) -> Optional[str]:
|
||||
return val if isinstance(val, str) and "\u0000" not in val else None
|
||||
@@ -1682,7 +1740,7 @@ class PersistEventsStore:
|
||||
# band membership", like a remote invite or a rejection of a remote invite.
|
||||
if (
|
||||
self.is_mine_id(event.state_key)
|
||||
and not backfilled
|
||||
and not inhibit_local_membership_updates
|
||||
and event.internal_metadata.is_outlier()
|
||||
and event.internal_metadata.is_out_of_band_membership()
|
||||
):
|
||||
|
||||
@@ -35,6 +35,7 @@ from typing import (
|
||||
)
|
||||
|
||||
import attr
|
||||
from mypy_extensions import DefaultNamedArg
|
||||
from prometheus_client import Counter, Histogram
|
||||
|
||||
from twisted.internet import defer
|
||||
@@ -112,7 +113,10 @@ times_pruned_extremities = Counter(
|
||||
@attr.s(auto_attribs=True, slots=True)
|
||||
class _EventPersistQueueItem:
|
||||
events_and_contexts: List[Tuple[EventBase, EventContext]]
|
||||
backfilled: bool
|
||||
should_calculate_state_and_forward_extrems: bool
|
||||
use_negative_stream_ordering: bool
|
||||
inhibit_local_membership_updates: bool
|
||||
update_room_forward_stream_ordering: bool
|
||||
deferred: ObservableDeferred
|
||||
|
||||
parent_opentracing_span_contexts: List = attr.ib(factory=list)
|
||||
@@ -133,7 +137,17 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
|
||||
def __init__(
|
||||
self,
|
||||
per_item_callback: Callable[
|
||||
[List[Tuple[EventBase, EventContext]], bool],
|
||||
[
|
||||
List[Tuple[EventBase, EventContext]],
|
||||
DefaultNamedArg(
|
||||
bool, "should_calculate_state_and_forward_extrems" # noqa: F821
|
||||
),
|
||||
DefaultNamedArg(bool, "use_negative_stream_ordering"), # noqa: F821
|
||||
DefaultNamedArg(bool, "inhibit_local_membership_updates"), # noqa: F821
|
||||
DefaultNamedArg(
|
||||
bool, "update_room_forward_stream_ordering" # noqa: F821
|
||||
),
|
||||
],
|
||||
Awaitable[_PersistResult],
|
||||
],
|
||||
):
|
||||
@@ -150,7 +164,11 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
|
||||
self,
|
||||
room_id: str,
|
||||
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
|
||||
backfilled: bool,
|
||||
*,
|
||||
should_calculate_state_and_forward_extrems: bool = True,
|
||||
use_negative_stream_ordering: bool = False,
|
||||
inhibit_local_membership_updates: bool = False,
|
||||
update_room_forward_stream_ordering: bool = True,
|
||||
) -> _PersistResult:
|
||||
"""Add events to the queue, with the given persist_event options.
|
||||
|
||||
@@ -160,7 +178,27 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
|
||||
Args:
|
||||
room_id (str):
|
||||
events_and_contexts (list[(EventBase, EventContext)]):
|
||||
backfilled (bool):
|
||||
should_calculate_state_and_forward_extrems: Determines whether we
|
||||
need to calculate the state and new forward extremities for the
|
||||
room. This should be set to false for backfilled events because
|
||||
we don't need to calculate the state for backfilled events and
|
||||
there is no need to update the forward extrems because we
|
||||
already know this event happened in the past if it was
|
||||
backfilled.
|
||||
use_negative_stream_ordering: Whether to start stream_ordering on
|
||||
the negative side and decrement. This should be set as True
|
||||
for backfilled events because backfilled events get a negative
|
||||
stream ordering so they don't come down incremental `/sync`.
|
||||
inhibit_local_membership_updates: Stop the local_current_membership
|
||||
from being updated by these events. This should be set to True
|
||||
for backfilled events because backfilled events in the past do
|
||||
not affect the current local state.
|
||||
update_room_forward_stream_ordering: Whether to update the
|
||||
stream_ordering position to mark the latest event as the front
|
||||
of the room. This should be set as False for backfilled
|
||||
events because backfilled events have negative stream_ordering
|
||||
and happened in the past so we know that we don't need to
|
||||
update the stream_ordering tip for the room.
|
||||
|
||||
Returns:
|
||||
the result returned by the `_per_item_callback` passed to
|
||||
@@ -170,7 +208,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
|
||||
|
||||
# if the last item in the queue has the same `backfilled` setting,
|
||||
# we can just add these new events to that item.
|
||||
if queue and queue[-1].backfilled == backfilled:
|
||||
if (
|
||||
queue
|
||||
and queue[-1].use_negative_stream_ordering == use_negative_stream_ordering
|
||||
):
|
||||
end_item = queue[-1]
|
||||
else:
|
||||
# need to make a new queue item
|
||||
@@ -180,7 +221,10 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
|
||||
|
||||
end_item = _EventPersistQueueItem(
|
||||
events_and_contexts=[],
|
||||
backfilled=backfilled,
|
||||
should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
|
||||
use_negative_stream_ordering=use_negative_stream_ordering,
|
||||
inhibit_local_membership_updates=inhibit_local_membership_updates,
|
||||
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
|
||||
deferred=deferred,
|
||||
)
|
||||
queue.append(end_item)
|
||||
@@ -241,7 +285,11 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
|
||||
item.opentracing_span_context = scope.span.context
|
||||
|
||||
ret = await self._per_item_callback(
|
||||
item.events_and_contexts, item.backfilled
|
||||
item.events_and_contexts,
|
||||
should_calculate_state_and_forward_extrems=item.should_calculate_state_and_forward_extrems,
|
||||
use_negative_stream_ordering=item.use_negative_stream_ordering,
|
||||
inhibit_local_membership_updates=item.inhibit_local_membership_updates,
|
||||
update_room_forward_stream_ordering=item.update_room_forward_stream_ordering,
|
||||
)
|
||||
except Exception:
|
||||
with PreserveLoggingContext():
|
||||
@@ -296,15 +344,37 @@ class EventsPersistenceStorage:
|
||||
async def persist_events(
|
||||
self,
|
||||
events_and_contexts: Iterable[Tuple[EventBase, EventContext]],
|
||||
backfilled: bool = False,
|
||||
*,
|
||||
should_calculate_state_and_forward_extrems: bool = True,
|
||||
use_negative_stream_ordering: bool = False,
|
||||
inhibit_local_membership_updates: bool = False,
|
||||
update_room_forward_stream_ordering: bool = True,
|
||||
) -> Tuple[List[EventBase], RoomStreamToken]:
|
||||
"""
|
||||
Write events to the database
|
||||
Args:
|
||||
events_and_contexts: list of tuples of (event, context)
|
||||
backfilled: Whether the results are retrieved from federation
|
||||
via backfill or not. Used to determine if they're "new" events
|
||||
which might update the current state etc.
|
||||
should_calculate_state_and_forward_extrems: Determines whether we
|
||||
need to calculate the state and new forward extremities for the
|
||||
room. This should be set to false for backfilled events because
|
||||
we don't need to calculate the state for backfilled events and
|
||||
there is no need to update the forward extrems because we
|
||||
already know this event happened in the past if it was
|
||||
backfilled.
|
||||
use_negative_stream_ordering: Whether to start stream_ordering on
|
||||
the negative side and decrement. This should be set as True
|
||||
for backfilled events because backfilled events get a negative
|
||||
stream ordering so they don't come down incremental `/sync`.
|
||||
inhibit_local_membership_updates: Stop the local_current_membership
|
||||
from being updated by these events. This should be set to True
|
||||
for backfilled events because backfilled events in the past do
|
||||
not affect the current local state.
|
||||
update_room_forward_stream_ordering: Whether to update the
|
||||
stream_ordering position to mark the latest event as the front
|
||||
of the room. This should be set as False for backfilled
|
||||
events because backfilled events have negative stream_ordering
|
||||
and happened in the past so we know that we don't need to
|
||||
update the stream_ordering tip for the room.
|
||||
|
||||
Returns:
|
||||
List of events persisted, the current position room stream position.
|
||||
@@ -320,7 +390,12 @@ class EventsPersistenceStorage:
|
||||
async def enqueue(item):
|
||||
room_id, evs_ctxs = item
|
||||
return await self._event_persist_queue.add_to_queue(
|
||||
room_id, evs_ctxs, backfilled=backfilled
|
||||
room_id,
|
||||
evs_ctxs,
|
||||
should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
|
||||
use_negative_stream_ordering=use_negative_stream_ordering,
|
||||
inhibit_local_membership_updates=inhibit_local_membership_updates,
|
||||
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
|
||||
)
|
||||
|
||||
ret_vals = await yieldable_gather_results(enqueue, partitioned.items())
|
||||
@@ -350,9 +425,43 @@ class EventsPersistenceStorage:
|
||||
|
||||
@opentracing.trace
|
||||
async def persist_event(
|
||||
self, event: EventBase, context: EventContext, backfilled: bool = False
|
||||
self,
|
||||
event: EventBase,
|
||||
context: EventContext,
|
||||
*,
|
||||
should_calculate_state_and_forward_extrems: bool = True,
|
||||
use_negative_stream_ordering: bool = False,
|
||||
inhibit_local_membership_updates: bool = False,
|
||||
update_room_forward_stream_ordering: bool = True,
|
||||
) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]:
|
||||
"""
|
||||
Write a single event to the database.
|
||||
|
||||
Args:
|
||||
event:
|
||||
context:
|
||||
should_calculate_state_and_forward_extrems: Determines whether we
|
||||
need to calculate the state and new forward extremities for the
|
||||
room. This should be set to false for backfilled events because
|
||||
we don't need to calculate the state for backfilled events and
|
||||
there is no need to update the forward extrems because we
|
||||
already know this event happened in the past if it was
|
||||
backfilled.
|
||||
use_negative_stream_ordering: Whether to start stream_ordering on
|
||||
the negative side and decrement. This should be set as True
|
||||
for backfilled events because backfilled events get a negative
|
||||
stream ordering so they don't come down incremental `/sync`.
|
||||
inhibit_local_membership_updates: Stop the local_current_membership
|
||||
from being updated by these events. This should be set to True
|
||||
for backfilled events because backfilled events in the past do
|
||||
not affect the current local state.
|
||||
update_room_forward_stream_ordering: Whether to update the
|
||||
stream_ordering position to mark the latest event as the front
|
||||
of the room. This should be set as False for backfilled
|
||||
events because backfilled events have negative stream_ordering
|
||||
and happened in the past so we know that we don't need to
|
||||
update the stream_ordering tip for the room.
|
||||
|
||||
Returns:
|
||||
The event, stream ordering of `event`, and the stream ordering of the
|
||||
latest persisted event. The returned event may not match the given
|
||||
@@ -363,7 +472,12 @@ class EventsPersistenceStorage:
|
||||
# event was deduplicated. (The dict may also include other entries if
|
||||
# the event was persisted in a batch with other events.)
|
||||
replaced_events = await self._event_persist_queue.add_to_queue(
|
||||
event.room_id, [(event, context)], backfilled=backfilled
|
||||
event.room_id,
|
||||
[(event, context)],
|
||||
should_calculate_state_and_forward_extrems=should_calculate_state_and_forward_extrems,
|
||||
use_negative_stream_ordering=use_negative_stream_ordering,
|
||||
inhibit_local_membership_updates=inhibit_local_membership_updates,
|
||||
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
|
||||
)
|
||||
replaced_event = replaced_events.get(event.event_id)
|
||||
if replaced_event:
|
||||
@@ -379,13 +493,41 @@ class EventsPersistenceStorage:
|
||||
async def _persist_event_batch(
|
||||
self,
|
||||
events_and_contexts: List[Tuple[EventBase, EventContext]],
|
||||
backfilled: bool = False,
|
||||
*,
|
||||
should_calculate_state_and_forward_extrems: bool = True,
|
||||
use_negative_stream_ordering: bool = False,
|
||||
inhibit_local_membership_updates: bool = False,
|
||||
update_room_forward_stream_ordering: bool = True,
|
||||
) -> Dict[str, str]:
|
||||
"""Callback for the _event_persist_queue
|
||||
|
||||
Calculates the change to current state and forward extremities, and
|
||||
persists the given events and with those updates.
|
||||
|
||||
Args:
|
||||
events_and_contexts:
|
||||
should_calculate_state_and_forward_extrems: Determines whether we
|
||||
need to calculate the state and new forward extremities for the
|
||||
room. This should be set to false for backfilled events because
|
||||
we don't need to calculate the state for backfilled events and
|
||||
there is no need to update the forward extrems because we
|
||||
already know this event happened in the past if it was
|
||||
backfilled.
|
||||
use_negative_stream_ordering: Whether to start stream_ordering on
|
||||
the negative side and decrement. This should be set as True
|
||||
for backfilled events because backfilled events get a negative
|
||||
stream ordering so they don't come down incremental `/sync`.
|
||||
inhibit_local_membership_updates: Stop the local_current_membership
|
||||
from being updated by these events. This should be set to True
|
||||
for backfilled events because backfilled events in the past do
|
||||
not affect the current local state.
|
||||
update_room_forward_stream_ordering: Whether to update the
|
||||
stream_ordering position to mark the latest event as the front
|
||||
of the room. This should be set as False for backfilled
|
||||
events because backfilled events have negative stream_ordering
|
||||
and happened in the past so we know that we don't need to
|
||||
update the stream_ordering tip for the room.
|
||||
|
||||
Returns:
|
||||
A dictionary of event ID to event ID we didn't persist as we already
|
||||
had another event persisted with the same TXN ID.
|
||||
@@ -448,7 +590,7 @@ class EventsPersistenceStorage:
|
||||
# device lists as stale.
|
||||
potentially_left_users: Set[str] = set()
|
||||
|
||||
if not backfilled:
|
||||
if should_calculate_state_and_forward_extrems:
|
||||
with Measure(self._clock, "_calculate_state_and_extrem"):
|
||||
# Work out the new "current state" for each room.
|
||||
# We do this by working out what the new extremities are and then
|
||||
@@ -583,7 +725,9 @@ class EventsPersistenceStorage:
|
||||
current_state_for_room=current_state_for_room,
|
||||
state_delta_for_room=state_delta_for_room,
|
||||
new_forward_extremeties=new_forward_extremeties,
|
||||
backfilled=backfilled,
|
||||
use_negative_stream_ordering=use_negative_stream_ordering,
|
||||
inhibit_local_membership_updates=inhibit_local_membership_updates,
|
||||
update_room_forward_stream_ordering=update_room_forward_stream_ordering,
|
||||
)
|
||||
|
||||
await self._handle_potentially_left_users(potentially_left_users)
|
||||
|
||||
@@ -323,7 +323,12 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
|
||||
if backfill:
|
||||
self.get_success(
|
||||
self.storage.persistence.persist_events(
|
||||
[(event, context)], backfilled=True
|
||||
[(event, context)],
|
||||
# Backfilled event
|
||||
should_calculate_state_and_forward_extrems=False,
|
||||
use_negative_stream_ordering=True,
|
||||
inhibit_local_membership_updates=True,
|
||||
update_room_forward_stream_ordering=False,
|
||||
)
|
||||
)
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user