From 6cc6bdbedfb892a204cb202226d5d9fad8d49e45 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 20 Aug 2024 11:10:34 -0500 Subject: [PATCH] Start of moving logic outside of the transaction (pre-process) --- synapse/storage/controllers/persist_events.py | 163 ++++++++++++++- synapse/storage/databases/main/events.py | 185 ++++++++---------- synapse/storage/databases/state/bg_updates.py | 6 +- 3 files changed, 250 insertions(+), 104 deletions(-) diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index d0e015bf19..9e30cec028 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -49,7 +49,7 @@ from prometheus_client import Counter, Histogram from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventTypes, Membership, EventContentFields from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME @@ -64,8 +64,16 @@ from synapse.logging.opentracing import ( from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.controllers.state import StateStorageController from synapse.storage.databases import Databases -from synapse.storage.databases.main.events import DeltaState +from synapse.storage.databases.main.events import ( + DeltaState, + SlidingSyncTableChanges, + SlidingSyncStateInsertValues, + SlidingSyncSnapshotInsertValues, +) from synapse.storage.databases.main.events_worker import EventRedactBehaviour +from synapse.storage.databases.main.events import ( + SLIDING_SYNC_RELEVANT_STATE_SET, +) from synapse.types import ( PersistedEventPosition, RoomStreamToken, @@ -604,6 +612,7 @@ class EventsPersistenceStorageController: new_forward_extremities = None state_delta_for_room = None + sliding_sync_table_changes = None if not backfilled: with Measure(self._clock, "_calculate_state_and_extrem"): @@ -617,6 +626,13 @@ class EventsPersistenceStorageController: room_id, chunk ) + with Measure(self._clock, "_calculate_sliding_sync_table_changes"): + sliding_sync_table_changes = ( + await self._calculate_sliding_sync_table_changes( + room_id, chunk, state_delta_for_room + ) + ) + with Measure(self._clock, "calculate_chain_cover_index_for_events"): # We now calculate chain ID/sequence numbers for any state events we're # persisting. We ignore out of band memberships as we're not in the room @@ -636,6 +652,7 @@ class EventsPersistenceStorageController: use_negative_stream_ordering=backfilled, inhibit_local_membership_updates=backfilled, new_event_links=new_event_links, + sliding_sync_table_changes=sliding_sync_table_changes, ) return replaced_events @@ -751,6 +768,148 @@ class EventsPersistenceStorageController: return (new_forward_extremities, delta) + async def _calculate_sliding_sync_table_changes( + self, + room_id: str, + events_and_contexts: List[Tuple[EventBase, EventContext]], + delta_state: Optional[DeltaState], + ) -> Optional[SlidingSyncTableChanges]: + """ + TODO + """ + to_insert = delta_state.to_insert + to_delete = delta_state.to_delete + + # This would only happen if someone was state reset out of the room + to_delete_membership_snapshots = { + (room_id, state_key) + for event_type, state_key in to_delete + if event_type == EventTypes.Member and self.is_mine_id(state_key) + } + + membership_snapshot_updates = {} + if to_insert: + membership_event_id_to_user_id_map: Dict[str, str] = {} + for state_key, event_id in to_insert.items(): + if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]): + membership_event_id_to_user_id_map[event_id] = state_key[1] + + if len(membership_event_id_to_user_id_map) > 0: + current_state_ids_map = ( + await self.main_store.get_partial_filtered_current_state_ids( + room_id, + state_filter=StateFilter.from_types( + SLIDING_SYNC_RELEVANT_STATE_SET + ), + ) + ) + # Since we fetched the current state before we took `to_insert`/`to_delete` + # into account, we need to do a couple fixups. + # + # Update the current_state_map with what we have `to_delete` + for state_key in to_delete: + current_state_ids_map.pop(state_key, None) + # Update the current_state_map with what we have `to_insert` + for state_key, event_id in to_insert.items(): + if state_key in SLIDING_SYNC_RELEVANT_STATE_SET: + current_state_ids_map[state_key] = event_id + + event_map = await self.main_store.get_events( + current_state_ids_map.values() + ) + + current_state_map = {} + for key, event_id in current_state_ids_map.items(): + event = event_map.get(event_id) + if event: + current_state_map[key] = event + + # Map of values to insert/update in the `sliding_sync_membership_snapshots` table + sliding_sync_insert_values = None + has_known_state = False + if current_state_map: + sliding_sync_insert_values = ( + self._get_sliding_sync_insert_values_from_state_map( + current_state_map + ) + ) + # We have current state to work from + has_known_state = True + else: + # We don't have any `current_state_events` anymore (previously + # cleared out because of `no_longer_in_room`). This can happen if + # one user is joined and another is invited (some non-join + # membership). If the joined user leaves, we are `no_longer_in_room` + # and `current_state_events` is cleared out. When the invited user + # rejects the invite (leaves the room), we will end up here. + # + # In these cases, we should inherit the meta data from the previous + # snapshot (handled by the default `ON CONFLICT ... DO UPDATE SET`). + # When using sliding sync filters, this will prevent the room from + # disappearing/appearing just because you left the room. + # + # Ideally, we could additionally assert that we're only here for + # valid non-join membership transitions. + assert delta_state.no_longer_in_room + + membership_snapshot_updates = { + (room_id, user_id): SlidingSyncSnapshotInsertValues( + membership_event_id=membership_event_id, + has_known_state=has_known_state, + ) + for membership_event_id, user_id in membership_event_id_to_user_id_map.items() + } + + return SlidingSyncTableChanges( + joined_room_updates=TODO, + to_delete_joined_rooms=TODO, + membership_snapshot_updates=membership_snapshot_updates, + to_delete_membership_snapshots=to_delete_membership_snapshots, + ) + + @classmethod + def _get_sliding_sync_insert_values_from_state_map( + cls, state_map: StateMap[EventBase] + ) -> SlidingSyncStateInsertValues: + """ + Extract the relevant state values from the `state_map` needed to insert into the + `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables. + + Returns: + Map from column names (`room_type`, `is_encrypted`, `room_name`) to relevant + state values needed to insert into + the `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` tables. + """ + # Map of values to insert/update in the `sliding_sync_membership_snapshots` table + sliding_sync_insert_map: Dict[str, Optional[Union[str, bool]]] = {} + + # Parse the raw event JSON + for state_key, event in state_map.items(): + if state_key == (EventTypes.Create, ""): + room_type = event.content.get(EventContentFields.ROOM_TYPE) + sliding_sync_insert_map["room_type"] = room_type + elif state_key == (EventTypes.RoomEncryption, ""): + encryption_algorithm = event.content.get( + EventContentFields.ENCRYPTION_ALGORITHM + ) + is_encrypted = encryption_algorithm is not None + sliding_sync_insert_map["is_encrypted"] = is_encrypted + elif state_key == (EventTypes.Name, ""): + room_name = event.content.get(EventContentFields.ROOM_NAME) + sliding_sync_insert_map["room_name"] = room_name + else: + # We only expect to see events according to the + # `SLIDING_SYNC_RELEVANT_STATE_SET`. + raise AssertionError( + f"Unexpected event (we should not be fetching extra events): {state_key} {event.event_id}" + ) + + return SlidingSyncStateInsertValues( + room_type=room_type, + is_encrypted=encryption_algorithm, + room_name=room_name, + ) + async def _calculate_new_extremities( self, room_id: str, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 10865b1274..9022701e30 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -125,6 +125,33 @@ class DeltaState: return not self.to_delete and not self.to_insert and not self.no_longer_in_room +@attr.s(slots=True, auto_attribs=True) +class SlidingSyncStateInsertValues: + room_type: Optional[str] + is_encrypted: Optional[bool] + room_name: Optional[str] + + +@attr.s(slots=True, auto_attribs=True) +class SlidingSyncSnapshotInsertValues(SlidingSyncStateInsertValues): + membership_event_id: str + has_known_state: Optional[bool] + # TODO: `sender` + + +@attr.s(slots=True, auto_attribs=True) +class SlidingSyncTableChanges: + # room_id -> dict to upsert into `sliding_sync_joined_rooms` + joined_room_updates: Dict[str, SlidingSyncStateInsertValues] + # room_ids to delete from `sliding_sync_joined_rooms` + to_delete_joined_rooms: StrCollection + + # (room_id, user_id) -> dict to upsert into sliding_sync_membership_snapshots + membership_snapshot_updates: Dict[Tuple[str, str], SlidingSyncSnapshotInsertValues] + # List of (room_id, user_id) to delete from `sliding_sync_membership_snapshots` + to_delete_membership_snapshots: Set[Tuple[str, str]] + + @attr.s(slots=True, auto_attribs=True) class NewEventChainLinks: """Information about new auth chain links that need to be added to the DB. @@ -193,6 +220,7 @@ class PersistEventsStore: new_event_links: Dict[str, NewEventChainLinks], use_negative_stream_ordering: bool = False, inhibit_local_membership_updates: bool = False, + sliding_sync_table_changes: Optional[SlidingSyncTableChanges], ) -> None: """Persist a set of events alongside updates to the current state and forward extremities tables. @@ -213,6 +241,7 @@ class PersistEventsStore: from being updated by these events. This should be set to True for backfilled events because backfilled events in the past do not affect the current local state. + sliding_sync_table_changes: TODO Returns: Resolves when the events have been persisted @@ -261,6 +290,7 @@ class PersistEventsStore: state_delta_for_room=state_delta_for_room, new_forward_extremities=new_forward_extremities, new_event_links=new_event_links, + sliding_sync_table_changes=sliding_sync_table_changes, ) persist_event_counter.inc(len(events_and_contexts)) @@ -484,6 +514,7 @@ class PersistEventsStore: state_delta_for_room: Optional[DeltaState], new_forward_extremities: Optional[Set[str]], new_event_links: Dict[str, NewEventChainLinks], + sliding_sync_table_changes: Optional[SlidingSyncTableChanges], ) -> None: """Insert some number of room events into the necessary database tables. @@ -507,6 +538,7 @@ class PersistEventsStore: state_delta_for_room: The current-state delta for the room. new_forward_extremities: The new forward extremities for the room: a set of the event ids which are the forward extremities. + sliding_sync_table_changes: TODO Raises: PartialStateConflictError: if attempting to persist a partial state event in @@ -617,7 +649,11 @@ class PersistEventsStore: # NB: This function invalidates all state related caches if state_delta_for_room: self._update_current_state_txn( - txn, room_id, state_delta_for_room, min_stream_order + txn, + room_id, + state_delta_for_room, + min_stream_order, + sliding_sync_table_changes, ) self._update_sliding_sync_tables_with_new_persisted_events_txn( @@ -1179,6 +1215,7 @@ class PersistEventsStore: room_id: str, delta_state: DeltaState, stream_id: int, + sliding_sync_table_changes: Optional[SlidingSyncTableChanges], ) -> None: to_delete = delta_state.to_delete to_insert = delta_state.to_insert @@ -1197,15 +1234,11 @@ class PersistEventsStore: # Handle updating the `sliding_sync_membership_snapshots` table # # This would only happen if someone was state reset out of the room - if to_delete: + if sliding_sync_table_changes.to_delete_membership_snapshots: txn.execute_batch( "DELETE FROM sliding_sync_membership_snapshots" " WHERE room_id = ? AND user_id = ?", - ( - (room_id, state_key) - for event_type, state_key in to_delete - if event_type == EventTypes.Member and self.is_mine_id(state_key) - ), + sliding_sync_table_changes.to_delete_membership_snapshots, ) # We handle `sliding_sync_membership_snapshots` before `current_state_events` so @@ -1215,99 +1248,53 @@ class PersistEventsStore: # We do this regardless of whether the server is `no_longer_in_room` or not # because we still want a row if a local user was just left/kicked or got banned # from the room. - if to_insert: - membership_event_id_to_user_id_map: Dict[str, str] = {} - for state_key, event_id in to_insert.items(): - if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]): - membership_event_id_to_user_id_map[event_id] = state_key[1] + if sliding_sync_table_changes.membership_snapshot_updates: - if len(membership_event_id_to_user_id_map) > 0: - current_state_map = ( - self._get_relevant_sliding_sync_current_state_event_ids_txn( - txn, room_id - ) + # TODO + [ + getattr(x, attr_name) + for attr_name in ["room_type", "is_encrypted", "room_name"] + ] + + # Update the `sliding_sync_membership_snapshots` table + # + # Pulling keys/values separately is safe and will produce congruent + # lists + insert_keys = sliding_sync_membership_snapshots_insert_map.keys() + insert_values = sliding_sync_membership_snapshots_insert_map.values() + # We need to insert/update regardless of whether we have `insert_keys` + # because there are other fields in the `ON CONFLICT` upsert to run (see + # inherit case above for more context when this happens). + txn.execute_batch( + f""" + INSERT INTO sliding_sync_membership_snapshots + (room_id, user_id, membership_event_id, membership, event_stream_ordering + {("," + ", ".join(insert_keys)) if insert_keys else ""}) + VALUES ( + ?, ?, ?, + (SELECT membership FROM room_memberships WHERE event_id = ?), + (SELECT stream_ordering FROM events WHERE event_id = ?) + {("," + ", ".join("?" for _ in insert_values)) if insert_values else ""} ) - # Since we fetched the current state before we took `to_insert`/`to_delete` - # into account, we need to do a couple fixups. - # - # Update the current_state_map with what we have `to_delete` - for state_key in to_delete: - current_state_map.pop(state_key, None) - # Update the current_state_map with what we have `to_insert` - for state_key, event_id in to_insert.items(): - if state_key in SLIDING_SYNC_RELEVANT_STATE_SET: - current_state_map[state_key] = event_id - - # Map of values to insert/update in the `sliding_sync_membership_snapshots` table - sliding_sync_membership_snapshots_insert_map: Dict[ - str, Optional[Union[str, bool]] - ] = {} - if current_state_map: - sliding_sync_membership_snapshots_insert_map = ( - self._get_sliding_sync_insert_values_from_state_map_txn( - txn, current_state_map - ) - ) - # We have current state to work from - sliding_sync_membership_snapshots_insert_map["has_known_state"] = ( - True - ) - else: - # We don't have any `current_state_events` anymore (previously - # cleared out because of `no_longer_in_room`). This can happen if - # one user is joined and another is invited (some non-join - # membership). If the joined user leaves, we are `no_longer_in_room` - # and `current_state_events` is cleared out. When the invited user - # rejects the invite (leaves the room), we will end up here. - # - # In these cases, we should inherit the meta data from the previous - # snapshot (handled by the default `ON CONFLICT ... DO UPDATE SET`). - # When using sliding sync filters, this will prevent the room from - # disappearing/appearing just because you left the room. - # - # Ideally, we could additionally assert that we're only here for - # valid non-join membership transitions. - assert delta_state.no_longer_in_room - - # Update the `sliding_sync_membership_snapshots` table - # - # Pulling keys/values separately is safe and will produce congruent - # lists - insert_keys = sliding_sync_membership_snapshots_insert_map.keys() - insert_values = sliding_sync_membership_snapshots_insert_map.values() - # We need to insert/update regardless of whether we have `insert_keys` - # because there are other fields in the `ON CONFLICT` upsert to run (see - # inherit case above for more context when this happens). - txn.execute_batch( - f""" - INSERT INTO sliding_sync_membership_snapshots - (room_id, user_id, membership_event_id, membership, event_stream_ordering - {("," + ", ".join(insert_keys)) if insert_keys else ""}) - VALUES ( - ?, ?, ?, - (SELECT membership FROM room_memberships WHERE event_id = ?), - (SELECT stream_ordering FROM events WHERE event_id = ?) - {("," + ", ".join("?" for _ in insert_values)) if insert_values else ""} - ) - ON CONFLICT (room_id, user_id) - DO UPDATE SET - membership_event_id = EXCLUDED.membership_event_id, - membership = EXCLUDED.membership, - event_stream_ordering = EXCLUDED.event_stream_ordering - {("," + ", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)) if insert_keys else ""} - """, + ON CONFLICT (room_id, user_id) + DO UPDATE SET + membership_event_id = EXCLUDED.membership_event_id, + membership = EXCLUDED.membership, + event_stream_ordering = EXCLUDED.event_stream_ordering + {("," + ", ".join(f"{key} = EXCLUDED.{key}" for key in insert_keys)) if insert_keys else ""} + """, + [ [ - [ - room_id, - user_id, - membership_event_id, - membership_event_id, - membership_event_id, - ] - + list(insert_values) - for membership_event_id, user_id in membership_event_id_to_user_id_map.items() - ], - ) + room_id, + user_id, + membership_event_id, + membership_event_id, + membership_event_id, + ] + + list(insert_values) + for membership_event_id, user_id in membership_event_id_to_user_id_map.items() + ], + ) if delta_state.no_longer_in_room: # Server is no longer in the room so we delete the room from @@ -1424,7 +1411,7 @@ class PersistEventsStore: # Map of values to insert/update in the `sliding_sync_joined_rooms` table sliding_sync_joined_rooms_insert_map = ( - self._get_sliding_sync_insert_values_from_state_map_txn( + self._get_sliding_sync_insert_values_from_state_ids_map_txn( txn, current_state_map ) ) @@ -1593,7 +1580,7 @@ class PersistEventsStore: return current_state_map @classmethod - def _get_sliding_sync_insert_values_from_state_map_txn( + def _get_sliding_sync_insert_values_from_state_ids_map_txn( cls, txn: LoggingTransaction, state_map: StateMap[str] ) -> Dict[str, Optional[Union[str, bool]]]: """ diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 244dedae3d..526bf7ea62 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -621,7 +621,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): # so we should have some current state for each room assert current_state_map - sliding_sync_joined_rooms_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_state_map_txn( + sliding_sync_joined_rooms_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_state_ids_map_txn( txn, current_state_map ) # We should have some insert values for each room, even if they are `None` @@ -754,7 +754,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): # for each room assert current_state_map - sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_state_map_txn( + sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_state_ids_map_txn( txn, current_state_map ) # We should have some insert values for each room, even if they are `None` @@ -854,7 +854,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): ) state_map = state_by_group[state_group] - sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_state_map_txn( + sliding_sync_membership_snapshots_insert_map = PersistEventsStore._get_sliding_sync_insert_values_from_state_ids_map_txn( txn, state_map ) # We should have some insert values for each room, even if they are `None`