diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 121d31b548..ac0919340b 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -38,7 +38,6 @@ from typing import ( Iterable, List, Optional, - Sequence, Set, Tuple, TypeVar, @@ -65,18 +64,9 @@ from synapse.logging.opentracing import ( from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.controllers.state import StateStorageController from synapse.storage.databases import Databases -from synapse.storage.databases.main.events import ( - SLIDING_SYNC_RELEVANT_STATE_SET, - DeltaState, - PersistEventsStore, - SlidingSyncMembershipInfo, - SlidingSyncMembershipSnapshotSharedInsertValues, - SlidingSyncStateInsertValues, - SlidingSyncTableChanges, -) +from synapse.storage.databases.main.events import DeltaState from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.types import ( - MutableStateMap, PersistedEventPosition, RoomStreamToken, StateMap, @@ -512,8 +502,10 @@ class EventsPersistenceStorageController: """ state = await self._calculate_current_state(room_id) delta = await self._calculate_state_delta(room_id, state) - sliding_sync_table_changes = await self._calculate_sliding_sync_table_changes( - room_id, [], delta + sliding_sync_table_changes = ( + await self.persist_events_store._calculate_sliding_sync_table_changes( + room_id, [], delta + ) ) await self.persist_events_store.update_current_state( @@ -619,7 +611,6 @@ class EventsPersistenceStorageController: new_forward_extremities = None state_delta_for_room = None - sliding_sync_table_changes = None if not backfilled: with Measure(self._clock, "_calculate_state_and_extrem"): @@ -633,14 +624,6 @@ class EventsPersistenceStorageController: room_id, chunk ) - if state_delta_for_room is not None: - with Measure(self._clock, "_calculate_sliding_sync_table_changes"): - sliding_sync_table_changes = ( - await self._calculate_sliding_sync_table_changes( - room_id, chunk, state_delta_for_room - ) - ) - with Measure(self._clock, "calculate_chain_cover_index_for_events"): # We now calculate chain ID/sequence numbers for any state events we're # persisting. We ignore out of band memberships as we're not in the room @@ -660,7 +643,6 @@ class EventsPersistenceStorageController: use_negative_stream_ordering=backfilled, inhibit_local_membership_updates=backfilled, new_event_links=new_event_links, - sliding_sync_table_changes=sliding_sync_table_changes, ) return replaced_events @@ -776,218 +758,6 @@ class EventsPersistenceStorageController: return (new_forward_extremities, delta) - async def _calculate_sliding_sync_table_changes( - self, - room_id: str, - events_and_contexts: Sequence[Tuple[EventBase, EventContext]], - delta_state: DeltaState, - ) -> SlidingSyncTableChanges: - """ - Calculate the changes to the `sliding_sync_membership_snapshots` and - `sliding_sync_joined_rooms` tables given the deltas that are going to be used to - update the `current_state_events` table. - - Just a bunch of pre-processing so we so we don't need to spend time in the - transaction itself gathering all of this info. It's also easier to deal with - redactions outside of a transaction. - - Args: - room_id: The room ID currently being processed. - events_and_contexts: List of tuples of (event, context) being persisted. - This is completely optional (you can pass an empty list) and will just - save us from fetching the events from the database if we already have - them. - delta_state: Deltas that are going to be used to update the - `current_state_events` table. - """ - to_insert = delta_state.to_insert - to_delete = delta_state.to_delete - - event_map = {event.event_id: event for event, _ in events_and_contexts} - - # Handle gathering info for the `sliding_sync_membership_snapshots` table - # - # This would only happen if someone was state reset out of the room - user_ids_to_delete_membership_snapshots = [ - state_key - for event_type, state_key in to_delete - if event_type == EventTypes.Member and self.is_mine_id(state_key) - ] - - membership_snapshot_shared_insert_values: ( - SlidingSyncMembershipSnapshotSharedInsertValues - ) = {} - membership_infos_to_insert_membership_snapshots: List[ - SlidingSyncMembershipInfo - ] = [] - if to_insert: - membership_event_id_to_user_id_map: Dict[str, str] = {} - for state_key, event_id in to_insert.items(): - if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]): - membership_event_id_to_user_id_map[event_id] = state_key[1] - - event_id_to_sender_map: Dict[str, str] = {} - # In normal event persist scenarios, we should be able to find the - # membership events in the `events_and_contexts` given to us but it's - # possible a state reset happened which added us to the room without a - # corresponding new membership event (reset back to a previous membership). - missing_membership_event_ids: Set[str] = set() - for membership_event_id in membership_event_id_to_user_id_map.keys(): - membership_event = event_map.get(membership_event_id) - if membership_event: - event_id_to_sender_map[membership_event_id] = ( - membership_event.sender - ) - else: - missing_membership_event_ids.add(membership_event_id) - - # Otherwise, we need to find a couple events that we were reset to. - if missing_membership_event_ids: - remaining_event_id_to_sender_map = ( - await self.main_store.get_sender_for_event_ids( - missing_membership_event_ids - ) - ) - # There shouldn't be any missing events - assert ( - remaining_event_id_to_sender_map.keys() - == missing_membership_event_ids - ), missing_membership_event_ids.difference( - remaining_event_id_to_sender_map.keys() - ) - event_id_to_sender_map.update(remaining_event_id_to_sender_map) - - membership_infos_to_insert_membership_snapshots = [ - SlidingSyncMembershipInfo( - user_id=user_id, - sender=event_id_to_sender_map[membership_event_id], - membership_event_id=membership_event_id, - ) - for membership_event_id, user_id in membership_event_id_to_user_id_map.items() - ] - - if membership_infos_to_insert_membership_snapshots: - current_state_ids_map: MutableStateMap[str] = dict( - await self.main_store.get_partial_filtered_current_state_ids( - room_id, - state_filter=StateFilter.from_types( - SLIDING_SYNC_RELEVANT_STATE_SET - ), - ) - ) - # Since we fetched the current state before we took `to_insert`/`to_delete` - # into account, we need to do a couple fixups. - # - # Update the current_state_map with what we have `to_delete` - for state_key in to_delete: - current_state_ids_map.pop(state_key, None) - # Update the current_state_map with what we have `to_insert` - for state_key, event_id in to_insert.items(): - if state_key in SLIDING_SYNC_RELEVANT_STATE_SET: - current_state_ids_map[state_key] = event_id - - fetched_events = await self.main_store.get_events( - current_state_ids_map.values() - ) - - current_state_map: StateMap[EventBase] = { - state_key: fetched_events[event_id] - for state_key, event_id in current_state_ids_map.items() - } - - if current_state_map: - state_insert_values = PersistEventsStore._get_sliding_sync_insert_values_from_state_map( - current_state_map - ) - membership_snapshot_shared_insert_values.update(state_insert_values) - # We have current state to work from - membership_snapshot_shared_insert_values["has_known_state"] = True - else: - # We don't have any `current_state_events` anymore (previously - # cleared out because of `no_longer_in_room`). This can happen if - # one user is joined and another is invited (some non-join - # membership). If the joined user leaves, we are `no_longer_in_room` - # and `current_state_events` is cleared out. When the invited user - # rejects the invite (leaves the room), we will end up here. - # - # In these cases, we should inherit the meta data from the previous - # snapshot so we shouldn't update any of the state values. When - # using sliding sync filters, this will prevent the room from - # disappearing/appearing just because you left the room. - # - # Ideally, we could additionally assert that we're only here for - # valid non-join membership transitions. - assert delta_state.no_longer_in_room - - # Handle gathering info for the `sliding_sync_joined_rooms` table - # - # We only deal with - # updating the state related columns. The - # `event_stream_ordering`/`bump_stamp` are updated elsewhere in the event - # persisting stack (see - # `_update_sliding_sync_tables_with_new_persisted_events_txn()`) - # - joined_room_updates: SlidingSyncStateInsertValues = {} - if not delta_state.no_longer_in_room: - # Look through the items we're going to insert into the current state to see - # if there is anything that we care about and should also update in the - # `sliding_sync_joined_rooms` table. - current_state_ids_map = {} - for state_key, event_id in to_insert.items(): - if state_key in SLIDING_SYNC_RELEVANT_STATE_SET: - current_state_ids_map[state_key] = event_id - - # Get the full event objects for the current state events - # - # In normal event persist scenarios, we should be able to find the state - # events in the `events_and_contexts` given to us but it's possible a state - # reset happened which that reset back to a previous state. - current_state_map = {} - missing_event_ids: Set[str] = set() - for state_key, event_id in current_state_ids_map.items(): - event = event_map.get(event_id) - if event: - current_state_map[state_key] = event - else: - missing_event_ids.add(event_id) - - # Otherwise, we need to find a couple events that we were reset to. - if missing_event_ids: - remaining_events = await self.main_store.get_events( - current_state_ids_map.values() - ) - # There shouldn't be any missing events - assert ( - remaining_events.keys() == missing_event_ids - ), missing_event_ids.difference(remaining_events.keys()) - for event in remaining_events.values(): - current_state_map[(event.type, event.state_key)] = event - - joined_room_updates = ( - PersistEventsStore._get_sliding_sync_insert_values_from_state_map( - current_state_map - ) - ) - - # If something is being deleted from the state, we need to clear it out - for state_key in to_delete: - if state_key == (EventTypes.Create, ""): - joined_room_updates["room_type"] = None - elif state_key == (EventTypes.RoomEncryption, ""): - joined_room_updates["is_encrypted"] = False - elif state_key == (EventTypes.Name, ""): - joined_room_updates["room_name"] = None - - return SlidingSyncTableChanges( - room_id=room_id, - # For `sliding_sync_joined_rooms` - joined_room_updates=joined_room_updates, - # For `sliding_sync_membership_snapshots` - membership_snapshot_shared_insert_values=membership_snapshot_shared_insert_values, - to_insert_membership_snapshots=membership_infos_to_insert_membership_snapshots, - to_delete_membership_snapshots=user_ids_to_delete_membership_snapshots, - ) - async def _calculate_new_extremities( self, room_id: str, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 85c8bbef8d..fe93e12eeb 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -32,6 +32,7 @@ from typing import ( Iterable, List, Optional, + Sequence, Set, Tuple, cast, @@ -75,6 +76,7 @@ from synapse.types import ( get_domain_from_id, ) from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES +from synapse.types.state import StateFilter from synapse.util import json_encoder from synapse.util.iterutils import batch_iter, sorted_topologically from synapse.util.stringutils import non_null_str_or_none @@ -245,7 +247,6 @@ class PersistEventsStore: new_event_links: Dict[str, NewEventChainLinks], use_negative_stream_ordering: bool = False, inhibit_local_membership_updates: bool = False, - sliding_sync_table_changes: Optional[SlidingSyncTableChanges], ) -> None: """Persist a set of events alongside updates to the current state and forward extremities tables. @@ -306,6 +307,14 @@ class PersistEventsStore: event.internal_metadata.stream_ordering = stream event.internal_metadata.instance_name = self._instance_name + sliding_sync_table_changes = None + if state_delta_for_room is not None: + sliding_sync_table_changes = ( + await self._calculate_sliding_sync_table_changes( + room_id, events_and_contexts, state_delta_for_room + ) + ) + await self.db_pool.runInteraction( "persist_events", self._persist_events_txn, @@ -342,6 +351,218 @@ class PersistEventsStore: (room_id,), frozenset(new_forward_extremities) ) + async def _calculate_sliding_sync_table_changes( + self, + room_id: str, + events_and_contexts: Sequence[Tuple[EventBase, EventContext]], + delta_state: DeltaState, + ) -> SlidingSyncTableChanges: + """ + Calculate the changes to the `sliding_sync_membership_snapshots` and + `sliding_sync_joined_rooms` tables given the deltas that are going to be used to + update the `current_state_events` table. + + Just a bunch of pre-processing so we so we don't need to spend time in the + transaction itself gathering all of this info. It's also easier to deal with + redactions outside of a transaction. + + Args: + room_id: The room ID currently being processed. + events_and_contexts: List of tuples of (event, context) being persisted. + This is completely optional (you can pass an empty list) and will just + save us from fetching the events from the database if we already have + them. + delta_state: Deltas that are going to be used to update the + `current_state_events` table. + """ + to_insert = delta_state.to_insert + to_delete = delta_state.to_delete + + event_map = {event.event_id: event for event, _ in events_and_contexts} + + # Handle gathering info for the `sliding_sync_membership_snapshots` table + # + # This would only happen if someone was state reset out of the room + user_ids_to_delete_membership_snapshots = [ + state_key + for event_type, state_key in to_delete + if event_type == EventTypes.Member and self.is_mine_id(state_key) + ] + + membership_snapshot_shared_insert_values: ( + SlidingSyncMembershipSnapshotSharedInsertValues + ) = {} + membership_infos_to_insert_membership_snapshots: List[ + SlidingSyncMembershipInfo + ] = [] + if to_insert: + membership_event_id_to_user_id_map: Dict[str, str] = {} + for state_key, event_id in to_insert.items(): + if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]): + membership_event_id_to_user_id_map[event_id] = state_key[1] + + event_id_to_sender_map: Dict[str, str] = {} + # In normal event persist scenarios, we should be able to find the + # membership events in the `events_and_contexts` given to us but it's + # possible a state reset happened which added us to the room without a + # corresponding new membership event (reset back to a previous membership). + missing_membership_event_ids: Set[str] = set() + for membership_event_id in membership_event_id_to_user_id_map.keys(): + membership_event = event_map.get(membership_event_id) + if membership_event: + event_id_to_sender_map[membership_event_id] = ( + membership_event.sender + ) + else: + missing_membership_event_ids.add(membership_event_id) + + # Otherwise, we need to find a couple events that we were reset to. + if missing_membership_event_ids: + remaining_event_id_to_sender_map = ( + await self.store.get_sender_for_event_ids( + missing_membership_event_ids + ) + ) + # There shouldn't be any missing events + assert ( + remaining_event_id_to_sender_map.keys() + == missing_membership_event_ids + ), missing_membership_event_ids.difference( + remaining_event_id_to_sender_map.keys() + ) + event_id_to_sender_map.update(remaining_event_id_to_sender_map) + + membership_infos_to_insert_membership_snapshots = [ + SlidingSyncMembershipInfo( + user_id=user_id, + sender=event_id_to_sender_map[membership_event_id], + membership_event_id=membership_event_id, + ) + for membership_event_id, user_id in membership_event_id_to_user_id_map.items() + ] + + if membership_infos_to_insert_membership_snapshots: + current_state_ids_map: MutableStateMap[str] = dict( + await self.store.get_partial_filtered_current_state_ids( + room_id, + state_filter=StateFilter.from_types( + SLIDING_SYNC_RELEVANT_STATE_SET + ), + ) + ) + # Since we fetched the current state before we took `to_insert`/`to_delete` + # into account, we need to do a couple fixups. + # + # Update the current_state_map with what we have `to_delete` + for state_key in to_delete: + current_state_ids_map.pop(state_key, None) + # Update the current_state_map with what we have `to_insert` + for state_key, event_id in to_insert.items(): + if state_key in SLIDING_SYNC_RELEVANT_STATE_SET: + current_state_ids_map[state_key] = event_id + + fetched_events = await self.store.get_events( + current_state_ids_map.values() + ) + + current_state_map: StateMap[EventBase] = { + state_key: fetched_events[event_id] + for state_key, event_id in current_state_ids_map.items() + } + + if current_state_map: + state_insert_values = PersistEventsStore._get_sliding_sync_insert_values_from_state_map( + current_state_map + ) + membership_snapshot_shared_insert_values.update(state_insert_values) + # We have current state to work from + membership_snapshot_shared_insert_values["has_known_state"] = True + else: + # We don't have any `current_state_events` anymore (previously + # cleared out because of `no_longer_in_room`). This can happen if + # one user is joined and another is invited (some non-join + # membership). If the joined user leaves, we are `no_longer_in_room` + # and `current_state_events` is cleared out. When the invited user + # rejects the invite (leaves the room), we will end up here. + # + # In these cases, we should inherit the meta data from the previous + # snapshot so we shouldn't update any of the state values. When + # using sliding sync filters, this will prevent the room from + # disappearing/appearing just because you left the room. + # + # Ideally, we could additionally assert that we're only here for + # valid non-join membership transitions. + assert delta_state.no_longer_in_room + + # Handle gathering info for the `sliding_sync_joined_rooms` table + # + # We only deal with + # updating the state related columns. The + # `event_stream_ordering`/`bump_stamp` are updated elsewhere in the event + # persisting stack (see + # `_update_sliding_sync_tables_with_new_persisted_events_txn()`) + # + joined_room_updates: SlidingSyncStateInsertValues = {} + if not delta_state.no_longer_in_room: + # Look through the items we're going to insert into the current state to see + # if there is anything that we care about and should also update in the + # `sliding_sync_joined_rooms` table. + current_state_ids_map = {} + for state_key, event_id in to_insert.items(): + if state_key in SLIDING_SYNC_RELEVANT_STATE_SET: + current_state_ids_map[state_key] = event_id + + # Get the full event objects for the current state events + # + # In normal event persist scenarios, we should be able to find the state + # events in the `events_and_contexts` given to us but it's possible a state + # reset happened which that reset back to a previous state. + current_state_map = {} + missing_event_ids: Set[str] = set() + for state_key, event_id in current_state_ids_map.items(): + event = event_map.get(event_id) + if event: + current_state_map[state_key] = event + else: + missing_event_ids.add(event_id) + + # Otherwise, we need to find a couple events that we were reset to. + if missing_event_ids: + remaining_events = await self.store.get_events( + current_state_ids_map.values() + ) + # There shouldn't be any missing events + assert ( + remaining_events.keys() == missing_event_ids + ), missing_event_ids.difference(remaining_events.keys()) + for event in remaining_events.values(): + current_state_map[(event.type, event.state_key)] = event + + joined_room_updates = ( + PersistEventsStore._get_sliding_sync_insert_values_from_state_map( + current_state_map + ) + ) + + # If something is being deleted from the state, we need to clear it out + for state_key in to_delete: + if state_key == (EventTypes.Create, ""): + joined_room_updates["room_type"] = None + elif state_key == (EventTypes.RoomEncryption, ""): + joined_room_updates["is_encrypted"] = False + elif state_key == (EventTypes.Name, ""): + joined_room_updates["room_name"] = None + + return SlidingSyncTableChanges( + room_id=room_id, + # For `sliding_sync_joined_rooms` + joined_room_updates=joined_room_updates, + # For `sliding_sync_membership_snapshots` + membership_snapshot_shared_insert_values=membership_snapshot_shared_insert_values, + to_insert_membership_snapshots=membership_infos_to_insert_membership_snapshots, + to_delete_membership_snapshots=user_ids_to_delete_membership_snapshots, + ) + async def calculate_chain_cover_index_for_events( self, room_id: str, events: Collection[EventBase] ) -> Dict[str, NewEventChainLinks]: diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py index a2122de7ee..9e5c0e2bf8 100644 --- a/tests/storage/test_events.py +++ b/tests/storage/test_events.py @@ -1432,27 +1432,19 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): ) ) event_chunk = [message_tuple] - delta_state = DeltaState( - # This is the state reset part. We're removing the room name state. - to_delete=[(EventTypes.Name, "")], - to_insert={}, - ) - assert self.storage_controllers.persistence is not None - sliding_sync_table_changes = self.get_success( - self.storage_controllers.persistence._calculate_sliding_sync_table_changes( - room_id, event_chunk, delta_state - ) - ) self.get_success( self.persist_events_store._persist_events_and_state_updates( room_id, event_chunk, - state_delta_for_room=delta_state, + state_delta_for_room=DeltaState( + # This is the state reset part. We're removing the room name state. + to_delete=[(EventTypes.Name, "")], + to_insert={}, + ), new_forward_extremities={message_tuple[0].event_id}, use_negative_stream_ordering=False, inhibit_local_membership_updates=False, new_event_links={}, - sliding_sync_table_changes=sliding_sync_table_changes, ) ) @@ -2680,27 +2672,19 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): ) ) event_chunk = [message_tuple] - delta_state = DeltaState( - # This is the state reset part. We're removing the room name state. - to_delete=[(EventTypes.Member, user1_id)], - to_insert={}, - ) - assert self.storage_controllers.persistence is not None - sliding_sync_table_changes = self.get_success( - self.storage_controllers.persistence._calculate_sliding_sync_table_changes( - room_id, event_chunk, delta_state - ) - ) self.get_success( self.persist_events_store._persist_events_and_state_updates( room_id, event_chunk, - state_delta_for_room=delta_state, + state_delta_for_room=DeltaState( + # This is the state reset part. We're removing the room name state. + to_delete=[(EventTypes.Member, user1_id)], + to_insert={}, + ), new_forward_extremities={message_tuple[0].event_id}, use_negative_stream_ordering=False, inhibit_local_membership_updates=False, new_event_links={}, - sliding_sync_table_changes=sliding_sync_table_changes, ) )