Refactor backfilled behavior into specific function parameters
Part of https://github.com/matrix-org/synapse/issues/11300
This commit is contained in:
@@ -1821,7 +1821,8 @@ class FederationEventHandler:
|
||||
self,
|
||||
room_id: str,
|
||||
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
|
||||
backfilled: bool = False,
|
||||
*,
|
||||
inhibit_push_notifications: bool = False,
|
||||
) -> int:
|
||||
"""Persists events and tells the notifier/pushers about them, if
|
||||
necessary.
|
||||
@@ -1831,8 +1832,9 @@ class FederationEventHandler:
|
||||
event_and_contexts: Sequence of events with their associated
|
||||
context that should be persisted. All events must belong to
|
||||
the same room.
|
||||
backfilled: Whether these events are a result of
|
||||
backfilling or not
|
||||
inhibit_push_notifications: Whether to stop the notifiers/pushers
|
||||
from knowing about the event. Usually this is done for any backfilled
|
||||
event.
|
||||
|
||||
Returns:
|
||||
The stream ID after which all events have been persisted.
|
||||
@@ -1850,7 +1852,7 @@ class FederationEventHandler:
|
||||
store=self._store,
|
||||
room_id=room_id,
|
||||
event_and_contexts=batch,
|
||||
backfilled=backfilled,
|
||||
inhibit_push_notifications=inhibit_push_notifications,
|
||||
)
|
||||
return result["max_stream_id"]
|
||||
else:
|
||||
@@ -1867,7 +1869,7 @@ class FederationEventHandler:
|
||||
# If there's an expiry timestamp on the event, schedule its expiry.
|
||||
self._message_handler.maybe_schedule_expiry(event)
|
||||
|
||||
if not backfilled: # Never notify for backfilled events
|
||||
if not inhibit_push_notifications: # Never notify for backfilled events
|
||||
for event in events:
|
||||
await self._notify_persisted_event(event, max_stream_token)
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
"rejected_reason": .., // The event.rejected_reason field
|
||||
"context": { .. serialized event context .. },
|
||||
}],
|
||||
"backfilled": false
|
||||
"inhibit_push_notifications": false
|
||||
}
|
||||
|
||||
200 OK
|
||||
@@ -69,14 +69,17 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
self.federation_event_handler = hs.get_federation_event_handler()
|
||||
|
||||
@staticmethod
|
||||
async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
|
||||
async def _serialize_payload(
|
||||
store, room_id, event_and_contexts, inhibit_push_notifications: bool = False
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
store
|
||||
room_id (str)
|
||||
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
|
||||
backfilled (bool): Whether or not the events are the result of
|
||||
backfilling
|
||||
inhibit_push_notifications (bool): Whether to stop the notifiers/pushers
|
||||
from knowing about the event. Usually this is done for any backfilled
|
||||
event.
|
||||
"""
|
||||
event_payloads = []
|
||||
for event, context in event_and_contexts:
|
||||
@@ -96,7 +99,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
|
||||
payload = {
|
||||
"events": event_payloads,
|
||||
"backfilled": backfilled,
|
||||
"inhibit_push_notifications": inhibit_push_notifications,
|
||||
"room_id": room_id,
|
||||
}
|
||||
|
||||
@@ -107,7 +110,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
content = parse_json_object_from_request(request)
|
||||
|
||||
room_id = content["room_id"]
|
||||
backfilled = content["backfilled"]
|
||||
inhibit_push_notifications = content["inhibit_push_notifications"]
|
||||
|
||||
event_payloads = content["events"]
|
||||
|
||||
@@ -132,7 +135,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||
logger.info("Got %d events from federation", len(event_and_contexts))
|
||||
|
||||
max_stream_id = await self.federation_event_handler.persist_events_and_notify(
|
||||
room_id, event_and_contexts, backfilled
|
||||
room_id, event_and_contexts, inhibit_push_notifications
|
||||
)
|
||||
|
||||
return 200, {"max_stream_id": max_stream_id}
|
||||
|
||||
@@ -121,10 +121,11 @@ class PersistEventsStore:
|
||||
async def _persist_events_and_state_updates(
|
||||
self,
|
||||
events_and_contexts: List[Tuple[EventBase, EventContext]],
|
||||
*,
|
||||
current_state_for_room: Dict[str, StateMap[str]],
|
||||
state_delta_for_room: Dict[str, DeltaState],
|
||||
new_forward_extremeties: Dict[str, List[str]],
|
||||
backfilled: bool = False,
|
||||
use_negative_stream_ordering: bool = False,
|
||||
) -> None:
|
||||
"""Persist a set of events alongside updates to the current state and
|
||||
forward extremities tables.
|
||||
@@ -137,7 +138,9 @@ class PersistEventsStore:
|
||||
room state
|
||||
new_forward_extremities: Map from room_id to list of event IDs
|
||||
that are the new forward extremities of the room.
|
||||
backfilled
|
||||
use_negative_stream_ordering: Whether to start stream_ordering on
|
||||
the negative side and decrement. Usually this is done for any
|
||||
backfilled event.
|
||||
|
||||
Returns:
|
||||
Resolves when the events have been persisted
|
||||
@@ -159,7 +162,7 @@ class PersistEventsStore:
|
||||
#
|
||||
# Note: Multiple instances of this function cannot be in flight at
|
||||
# the same time for the same room.
|
||||
if backfilled:
|
||||
if use_negative_stream_ordering:
|
||||
stream_ordering_manager = self._backfill_id_gen.get_next_mult(
|
||||
len(events_and_contexts)
|
||||
)
|
||||
@@ -182,7 +185,8 @@ class PersistEventsStore:
|
||||
)
|
||||
persist_event_counter.inc(len(events_and_contexts))
|
||||
|
||||
if not backfilled:
|
||||
# TODO: test that this actuall works
|
||||
if stream < 0:
|
||||
# backfilled events have negative stream orderings, so we don't
|
||||
# want to set the event_persisted_position to that.
|
||||
synapse.metrics.event_persisted_position.set(
|
||||
@@ -1200,7 +1204,8 @@ class PersistEventsStore:
|
||||
self,
|
||||
txn,
|
||||
events_and_contexts: List[Tuple[EventBase, EventContext]],
|
||||
backfilled: bool,
|
||||
*,
|
||||
update_room_forward_stream_ordering: bool = True,
|
||||
):
|
||||
"""Update min_depth for each room
|
||||
|
||||
@@ -1208,13 +1213,16 @@ class PersistEventsStore:
|
||||
txn (twisted.enterprise.adbapi.Connection): db connection
|
||||
events_and_contexts (list[(EventBase, EventContext)]): events
|
||||
we are persisting
|
||||
backfilled (bool): True if the events were backfilled
|
||||
update_room_forward_stream_ordering (bool): Whether to update the
|
||||
stream_ordering position to mark the latest event as the front
|
||||
of the room. This should only be set as false for backfilled
|
||||
events.
|
||||
"""
|
||||
depth_updates: Dict[str, int] = {}
|
||||
for event, context in events_and_contexts:
|
||||
# Remove the any existing cache entries for the event_ids
|
||||
txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
|
||||
if not backfilled:
|
||||
if update_room_forward_stream_ordering:
|
||||
txn.call_after(
|
||||
self.store._events_stream_cache.entity_has_changed,
|
||||
event.room_id,
|
||||
@@ -1638,8 +1646,19 @@ class PersistEventsStore:
|
||||
txn, table="event_reference_hashes", values=vals
|
||||
)
|
||||
|
||||
def _store_room_members_txn(self, txn, events, backfilled):
|
||||
"""Store a room member in the database."""
|
||||
def _store_room_members_txn(
|
||||
self, txn, events, *, inhibit_local_membership_updates: bool = False
|
||||
):
|
||||
"""
|
||||
Store a room member in the database.
|
||||
|
||||
Args:
|
||||
txn: The transaction to use.
|
||||
events: List of events to store.
|
||||
inhibit_local_membership_updates: Stop the local_current_membership
|
||||
from being updated by these events. Usually this is done for
|
||||
backfilled events.
|
||||
"""
|
||||
|
||||
def non_null_str_or_none(val: Any) -> Optional[str]:
|
||||
return val if isinstance(val, str) and "\u0000" not in val else None
|
||||
@@ -1682,7 +1701,7 @@ class PersistEventsStore:
|
||||
# band membership", like a remote invite or a rejection of a remote invite.
|
||||
if (
|
||||
self.is_mine_id(event.state_key)
|
||||
and not backfilled
|
||||
and not inhibit_local_membership_updates
|
||||
and event.internal_metadata.is_outlier()
|
||||
and event.internal_metadata.is_out_of_band_membership()
|
||||
):
|
||||
|
||||
@@ -379,13 +379,19 @@ class EventsPersistenceStorage:
|
||||
async def _persist_event_batch(
|
||||
self,
|
||||
events_and_contexts: List[Tuple[EventBase, EventContext]],
|
||||
backfilled: bool = False,
|
||||
should_calculate_state_and_forward_extrems: bool = True,
|
||||
) -> Dict[str, str]:
|
||||
"""Callback for the _event_persist_queue
|
||||
|
||||
Calculates the change to current state and forward extremities, and
|
||||
persists the given events and with those updates.
|
||||
|
||||
Args:
|
||||
events_and_contexts:
|
||||
should_calculate_state_and_forward_extrems: Determines whether we
|
||||
need to calculate the state and new forward extremities for the
|
||||
room. This should be set to false for backfilled events.
|
||||
|
||||
Returns:
|
||||
A dictionary of event ID to event ID we didn't persist as we already
|
||||
had another event persisted with the same TXN ID.
|
||||
@@ -448,7 +454,7 @@ class EventsPersistenceStorage:
|
||||
# device lists as stale.
|
||||
potentially_left_users: Set[str] = set()
|
||||
|
||||
if not backfilled:
|
||||
if should_calculate_state_and_forward_extrems:
|
||||
with Measure(self._clock, "_calculate_state_and_extrem"):
|
||||
# Work out the new "current state" for each room.
|
||||
# We do this by working out what the new extremities are and then
|
||||
|
||||
Reference in New Issue
Block a user